kedro.contrib.io.pyspark package

kedro.contrib.io.pyspark provides I/O modules for Apache Spark.

Submodules

kedro.contrib.io.pyspark.spark_data_set module

AbstractDataSet implementation to access Spark data frames using pyspark

class kedro.contrib.io.pyspark.spark_data_set.SparkDataSet(filepath, file_format='parquet', load_args=None, save_args=None)[source]

Bases: kedro.io.core.AbstractDataSet, kedro.io.core.ExistsMixin

SparkDataSet loads and saves Spark data frames.

Example:

from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType,
                               IntegerType, StructType)

from kedro.contrib.io.pyspark import SparkDataSet

schema = StructType([StructField("name", StringType(), True),
                     StructField("age", IntegerType(), True)])

data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]

spark_df = SparkSession.builder.getOrCreate()                                .createDataFrame(data, schema)

data_set = SparkDataSet(filepath="test_data")
data_set.save(spark_df)
reloaded = data_set.load()

reloaded.take(4)
__init__(filepath, file_format='parquet', load_args=None, save_args=None)[source]

Creates a new instance of SparkDataSet.

Parameters:
Return type:

None

kedro.contrib.io.pyspark.spark_jdbc module

SparkJDBCDataSet to load and save a PySpark DataFrame via JDBC.

class kedro.contrib.io.pyspark.spark_jdbc.SparkJDBCDataSet(url, table, credentials=None, load_args=None, save_args=None)[source]

Bases: kedro.io.core.AbstractDataSet

SparkJDBCDataSet loads data from a database table accessible via JDBC URL url and connection properties and saves the content of a PySpark DataFrame to an external database table via JDBC. It uses pyspark.sql.DataFrameReader and pyspark.sql.DataFrameWriter internally, so it supports all allowed PySpark options on jdbc.

Example:

import pandas as pd

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
data = spark.createDataFrame(pd.DataFrame({'col1': [1, 2],
                                           'col2': [4, 5],
                                           'col3': [5, 6]}))
url = 'jdbc:postgresql://localhost/test'
table = 'table_a'
connection_properties = {'driver': 'org.postgresql.Driver'}
data_set = SparkJDBCDataSet(
    url=url, table=table, credentials={'user': 'scott',
                                       'password': 'tiger'},
    load_args={'properties': connection_properties},
    save_args={'properties': connection_properties})

data_set.save(data)
reloaded = data_set.load()

assert data.toPandas().equals(reloaded.toPandas())
__init__(url, table, credentials=None, load_args=None, save_args=None)[source]

Creates a new SparkJDBCDataSet.

Parameters:
Raises:

DataSetError – When either url or table is empty.

Return type:

None