Working with PySpark¶
Note: This documentation is based onKedro 0.14.0
, if you spot anything that is incorrect then please create an issue or pull request.
In this tutorial we explain how to work with PySpark
in a Kedro pipeline.
Relevant API documentation: SparkDataSet
Initialising a SparkSession
¶
Before any PySpark
operations are performed, you should initialise your SparkSession
, typically in your application’s entry point before running the pipeline.
For example, if you are using Kedro’s project template, then you could create a function init_spark_session()
in src/<your_project_name>/run.py
as follows:
from pyspark.sql import SparkSession
from kedro.config import ConfigLoader
from kedro.runner import SequentialRunner
# ...
def init_spark_session(aws_access_key, aws_secret_key):
# select only/add more config options as per your needs
return (
SparkSession.builder.master("local[*]")
.appName("kedro")
.config("spark.driver.memory", "4g")
.config("spark.driver.maxResultSize", "3g")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.sql.execution.arrow.enabled", "true")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5")
.config("spark.jars.excludes", "joda-time:joda-time")
.config("fs.s3a.access.key", aws_access_key)
.config("fs.s3a.secret.key", aws_secret_key)
.getOrCreate()
)
def main():
# ...
config = ConfigLoader(["conf/base", "conf/local"])
credentials = config.get("credentials*", "credentials*/**")
# Initialise SparkSession
spark = init_spark_session(credentials["aws"]["access_key"],
credentials["aws"]["secret_key"])
# Run the pipeline
io.add_feed_dict({'parameters': parameters}, replace=True)
SequentialRunner().run(pipeline, io)
Since SparkSession
is a singleton, the next time you call SparkSession.builder.getOrCreate()
you will be provided with the same SparkSession
you initialised at your app’s entry point.
Creating a SparkDataSet
¶
Having created a SparkSession
, you can load your data using PySpark
’s DataFrameReader.
To do so, please use the provided SparkDataSet:
Code API¶
import pyspark.sql
from kedro.io import DataCatalog
from kedro.contrib.io.pyspark import SparkDataSet
spark_ds = SparkDataSet(
filepath="s3a://your_bucket/data/01_raw/weather*",
file_format="csv",
load_args={"header": True, "inferSchema": True},
save_args={"sep": "|", "header": True},
)
catalog = DataCatalog({"weather": spark_ds})
df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)
YAML API¶
In catalog.yml
:
weather:
type: kedro.contrib.io.pyspark.SparkDataSet
filepath: s3a://your_bucket/data/01_raw/weather*
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True
In run.py
:
import pyspark.sql
from kedro.io import DataCatalog
from kedro.config import ConfigLoader
config = ConfigLoader(["conf/base", "conf/local"])
catalog = DataCatalog.from_config(
config.get("catalog*", "catalog*/**"),
config.get("credentials*", "credentials*/**"),
)
df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)
Working with PySpark and Kedro pipelines¶
Continuing from the example of the previous section, since catalog.load("weather")
returns a pyspark.sql.DataFrame
, any Kedro pipeline nodes which have weather
as an input will be provided with a PySpark
dataframe:
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline, node
from kedro.config import ConfigLoader
from kedro.runner import SequentialRunner
def my_node(weather):
weather.show() # weather is a pyspark.sql.DataFrame
def main():
config = ConfigLoader(["conf/base", "conf/local"])
io = DataCatalog.from_config(
config.get("catalog*", "catalog*/**"),
config.get("credentials*", "credentials*/**"),
)
pipeline = Pipeline([node(my_node, "weather", None)])
SequentialRunner().run(pipeline, io)