kedro.contrib.io.pyspark.SparkJDBCDataSet¶
-
class
kedro.contrib.io.pyspark.
SparkJDBCDataSet
(url, table, credentials=None, load_args=None, save_args=None)[source]¶ Bases:
kedro.io.core.AbstractDataSet
SparkJDBCDataSet
loads data from a database table accessible via JDBC URL url and connection properties and saves the content of a PySpark DataFrame to an external database table via JDBC. It usespyspark.sql.DataFrameReader
andpyspark.sql.DataFrameWriter
internally, so it supports all allowed PySpark options onjdbc
.Example:
import pandas as pd from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() data = spark.createDataFrame(pd.DataFrame({'col1': [1, 2], 'col2': [4, 5], 'col3': [5, 6]})) url = 'jdbc:postgresql://localhost/test' table = 'table_a' connection_properties = {'driver': 'org.postgresql.Driver'} data_set = SparkJDBCDataSet( url=url, table=table, credentials={'user': 'scott', 'password': 'tiger'}, load_args={'properties': connection_properties}, save_args={'properties': connection_properties}) data_set.save(data) reloaded = data_set.load() assert data.toPandas().equals(reloaded.toPandas())
Methods
SparkJDBCDataSet.__init__
(url, table[, …])Creates a new SparkJDBCDataSet
.SparkJDBCDataSet.from_config
(name, config[, …])Create a data set instance using the configuration provided. SparkJDBCDataSet.load
()Loads data by delegation to the provided load method. SparkJDBCDataSet.save
(data)Saves data by delegation to the provided save method. -
__init__
(url, table, credentials=None, load_args=None, save_args=None)[source]¶ Creates a new
SparkJDBCDataSet
.Parameters: - url (
str
) – A JDBC URL of the formjdbc:subprotocol:subname
. - table (
str
) – The name of the table to load or save data to. - credentials (
Optional
[Dict
[str
,Any
]]) – A dictionary of JDBC database connection arguments. Normally at least propertiesuser
andpassword
with their corresponding values. It updatesproperties
parameter inload_args
andsave_args
in case it is provided. - load_args (
Optional
[Dict
[str
,Any
]]) – Provided to underlying PySparkjdbc
function along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameReader.jdbc - save_args (
Optional
[Dict
[str
,Any
]]) – Provided to underlying PySparkjdbc
function along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameWriter.jdbc
Raises: DataSetError
– When eitherurl
ortable
is empty.Return type: None
- url (
-
classmethod
from_config
(name, config, load_version=None, save_version=None)¶ Create a data set instance using the configuration provided.
Parameters: - name (
str
) – Data set name. - config (
Dict
[str
,Any
]) – Data set config dictionary. - load_version (
Optional
[str
]) – Version string to be used forload
operation if the data set is versioned. Has no effect on the data set if versioning was not enabled. - save_version (
Optional
[str
]) – Version string to be used forsave
operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.
Return type: AbstractDataSet
Returns: An instance of an
AbstractDataSet
subclass.Raises: DataSetError
– When the function fails to create the data set from its config.- name (
-
load
()¶ Loads data by delegation to the provided load method.
Return type: Any
Returns: Data returned by the provided load method. Raises: DataSetError
– When underlying load method raises error.
-
save
(data)¶ Saves data by delegation to the provided save method.
Parameters: data ( Any
) – the value to be saved by provided save method.Raises: DataSetError
– when underlying save method raises error.Return type: None
-