kedro.contrib.io.pyspark package¶
kedro.contrib.io.pyspark provides I/O modules for Apache Spark.
Submodules¶
kedro.contrib.io.pyspark.spark_data_set module¶
AbstractDataSet
implementation to access Spark data frames using
pyspark
-
class
kedro.contrib.io.pyspark.spark_data_set.
SparkDataSet
(filepath, file_format='parquet', load_args=None, save_args=None)[source]¶ Bases:
kedro.io.core.AbstractDataSet
,kedro.io.core.ExistsMixin
SparkDataSet
loads and saves Spark data frames.Example:
from pyspark.sql import SparkSession from pyspark.sql.types import (StructField, StringType, IntegerType, StructType) from kedro.contrib.io.pyspark import SparkDataSet schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)]) data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)] spark_df = SparkSession.builder.getOrCreate() .createDataFrame(data, schema) data_set = SparkDataSet(filepath="test_data") data_set.save(spark_df) reloaded = data_set.load() reloaded.take(4)
-
__init__
(filepath, file_format='parquet', load_args=None, save_args=None)[source]¶ Creates a new instance of
SparkDataSet
.Parameters: - filepath (
str
) – path to a Spark data frame. - file_format (
str
) – file format used during load and save operations. These are formats supported by the running SparkContext include parquet, csv. For a list of supported formats please refer to Apache Spark documentation at https://spark.apache.org/docs/latest/sql-programming-guide.html - load_args (
Optional
[Dict
[str
,Any
]]) – Load args passed to Spark DataFrameReader load method. It is dependent on the selected file format. You can find a list of read options for each supported format in Spark DataFrame read documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame - save_args (
Optional
[Dict
[str
,Any
]]) – Save args passed to Spark DataFrame write options. Similar to load_args this is dependent on the selected file format. You can passmode
andpartitionBy
to specify your overwrite mode and partitioning respectively. You can find a list of options for each format in Spark DataFrame write documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
Return type: None
- filepath (
-
kedro.contrib.io.pyspark.spark_jdbc module¶
SparkJDBCDataSet to load and save a PySpark DataFrame via JDBC.
-
class
kedro.contrib.io.pyspark.spark_jdbc.
SparkJDBCDataSet
(url, table, credentials=None, load_args=None, save_args=None)[source]¶ Bases:
kedro.io.core.AbstractDataSet
SparkJDBCDataSet
loads data from a database table accessible via JDBC URL url and connection properties and saves the content of a PySpark DataFrame to an external database table via JDBC. It usespyspark.sql.DataFrameReader
andpyspark.sql.DataFrameWriter
internally, so it supports all allowed PySpark options onjdbc
.Example:
import pandas as pd from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() data = spark.createDataFrame(pd.DataFrame({'col1': [1, 2], 'col2': [4, 5], 'col3': [5, 6]})) url = 'jdbc:postgresql://localhost/test' table = 'table_a' connection_properties = {'driver': 'org.postgresql.Driver'} data_set = SparkJDBCDataSet( url=url, table=table, credentials={'user': 'scott', 'password': 'tiger'}, load_args={'properties': connection_properties}, save_args={'properties': connection_properties}) data_set.save(data) reloaded = data_set.load() assert data.toPandas().equals(reloaded.toPandas())
-
__init__
(url, table, credentials=None, load_args=None, save_args=None)[source]¶ Creates a new
SparkJDBCDataSet
.Parameters: - url (
str
) – A JDBC URL of the formjdbc:subprotocol:subname
. - table (
str
) – The name of the table to load or save data to. - credentials (
Optional
[Dict
[str
,Any
]]) – A dictionary of JDBC database connection arguments. Normally at least propertiesuser
andpassword
with their corresponding values. It updatesproperties
parameter inload_args
andsave_args
in case it is provided. - load_args (
Optional
[Dict
[str
,Any
]]) – Provided to underlying PySparkjdbc
function along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameReader.jdbc - save_args (
Optional
[Dict
[str
,Any
]]) – Provided to underlying PySparkjdbc
function along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameWriter.jdbc
Raises: DataSetError
– When eitherurl
ortable
is empty.Return type: None
- url (
-