kedro.contrib.io.pyspark.SparkJDBCDataSet

class kedro.contrib.io.pyspark.SparkJDBCDataSet(url, table, credentials=None, load_args=None, save_args=None)[source]

Bases: kedro.io.core.AbstractDataSet

SparkJDBCDataSet loads data from a database table accessible via JDBC URL url and connection properties and saves the content of a PySpark DataFrame to an external database table via JDBC. It uses pyspark.sql.DataFrameReader and pyspark.sql.DataFrameWriter internally, so it supports all allowed PySpark options on jdbc.

Example:

import pandas as pd

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
data = spark.createDataFrame(pd.DataFrame({'col1': [1, 2],
                                           'col2': [4, 5],
                                           'col3': [5, 6]}))
url = 'jdbc:postgresql://localhost/test'
table = 'table_a'
connection_properties = {'driver': 'org.postgresql.Driver'}
data_set = SparkJDBCDataSet(
    url=url, table=table, credentials={'user': 'scott',
                                       'password': 'tiger'},
    load_args={'properties': connection_properties},
    save_args={'properties': connection_properties})

data_set.save(data)
reloaded = data_set.load()

assert data.toPandas().equals(reloaded.toPandas())

Methods

SparkJDBCDataSet.__init__(url, table[, …]) Creates a new SparkJDBCDataSet.
SparkJDBCDataSet.exists() Checks whether a data set’s output already exists by calling the provided _exists() method.
SparkJDBCDataSet.from_config(name, config[, …]) Create a data set instance using the configuration provided.
SparkJDBCDataSet.load() Loads data by delegation to the provided load method.
SparkJDBCDataSet.save(data) Saves data by delegation to the provided save method.
__init__(url, table, credentials=None, load_args=None, save_args=None)[source]

Creates a new SparkJDBCDataSet.

Parameters:
Raises:

DataSetError – When either url or table is empty.

Return type:

None

exists()

Checks whether a data set’s output already exists by calling the provided _exists() method.

Return type:bool
Returns:Flag indicating whether the output already exists.
Raises:DataSetError – when underlying exists method raises error.
classmethod from_config(name, config, load_version=None, save_version=None)

Create a data set instance using the configuration provided.

Parameters:
  • name (str) – Data set name.
  • config (Dict[str, Any]) – Data set config dictionary.
  • load_version (Optional[str]) – Version string to be used for load operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.
  • save_version (Optional[str]) – Version string to be used for save operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.
Return type:

AbstractDataSet

Returns:

An instance of an AbstractDataSet subclass.

Raises:

DataSetError – When the function fails to create the data set from its config.

load()

Loads data by delegation to the provided load method.

Return type:Any
Returns:Data returned by the provided load method.
Raises:DataSetError – When underlying load method raises error.
save(data)

Saves data by delegation to the provided save method.

Parameters:data (Any) – the value to be saved by provided save method.
Raises:DataSetError – when underlying save method raises error.
Return type:None