Working with PySpark¶
Note: This documentation is based onKedro 0.15.5
, if you spot anything that is incorrect then please create an issue or pull request.
In this tutorial we explain how to work with PySpark
in a Kedro pipeline.
Relevant API documentation: SparkDataSet, SparkJDBCDataSet
Initialising a SparkSession
¶
Before any PySpark
operations are performed, you should initialise your SparkSession
, typically in your application’s entry point before running the pipeline.
For example, if you are using Kedro’s project template, then you could add init_spark_session()
method to the ProjectContext
class in src/<your_project_name>/run.py
as follows:
import getpass
from pyspark import SparkConf
from pyspark.sql import SparkSession
# ...
class ProjectContext(KedroContext):
# ...
def __init__(self, project_path: Union[Path, str], env: str = None):
super().__init__(project_path, env)
self._spark_session = None
self.init_spark_session()
def init_spark_session(self, yarn=True) -> None:
"""Initialises a SparkSession using the config defined in project's conf folder."""
if self._spark_session:
return self._spark_session
parameters = self.config_loader.get("spark*", "spark*/**")
spark_conf = SparkConf().setAll(parameters.items())
spark_session_conf = (
SparkSession.builder.appName(
"{}_{}".format(self.project_name, getpass.getuser())
)
.enableHiveSupport()
.config(conf=spark_conf)
)
if yarn:
self._spark_session = spark_session_conf.master("yarn").getOrCreate()
else:
self._spark_session = spark_session_conf.getOrCreate()
self._spark_session.sparkContext.setLogLevel("WARN")
project_name = "kedro"
project_version = "0.15.5"
# ...
Create conf/base/spark.yml
and specify the parameters as follows:
spark.driver.maxResultSize: 3g
spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
spark.sql.execution.arrow.enabled: true
spark.jars.packages: org.apache.hadoop:hadoop-aws:2.7.5
spark.jars.excludes: joda-time:joda-time
Since SparkSession
is a singleton, the next time you call SparkSession.builder.getOrCreate()
you will be provided with the same SparkSession
you initialised at your app’s entry point.
Creating a SparkDataSet
¶
Having created a SparkSession
, you can load your data using PySpark
’s DataFrameReader.
To do so, please use the provided SparkDataSet:
Code API¶
import pyspark.sql
from kedro.io import DataCatalog
from kedro.contrib.io.pyspark import SparkDataSet
spark_ds = SparkDataSet(
filepath="s3a://your_bucket/data/01_raw/weather*",
file_format="csv",
load_args={"header": True, "inferSchema": True},
save_args={"sep": "|", "header": True},
)
catalog = DataCatalog({"weather": spark_ds})
df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)
YAML API¶
In catalog.yml
:
weather:
type: kedro.contrib.io.pyspark.SparkDataSet
filepath: s3a://your_bucket/data/01_raw/weather*
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True
In run.py
:
import pyspark.sql
from kedro.io import DataCatalog
from kedro.config import ConfigLoader
config = ConfigLoader(["conf/base", "conf/local"])
catalog = DataCatalog.from_config(
config.get("catalog*", "catalog*/**"),
config.get("credentials*", "credentials*/**"),
)
df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)
Working with PySpark and Kedro pipelines¶
Continuing from the example of the previous section, since catalog.load("weather")
returns a pyspark.sql.DataFrame
, any Kedro pipeline nodes which have weather
as an input will be provided with a PySpark
dataframe:
from kedro.pipeline import Pipeline, node
def my_node(weather):
weather.show() # weather is a pyspark.sql.DataFrame
class ProjectContext(KedroContext):
# ...
@property
def pipeline(self) -> Pipeline: # requires import from user code
return Pipeline([node(my_node, "weather", None)])
# ...