Advanced IO¶
Note: This documentation is based onKedro 0.15.5
, if you spot anything that is incorrect then please create an issue or pull request.
In this tutorial, you will learn about advanced uses of the Kedro IO module and understand the underlying implementation.
Relevant API documentation: AbstractDataSet, DataSetError
Error handling¶
We have custom exceptions for the main classes of errors that you can handle to deal with failures.
from kedro.io import *
io = DataCatalog(data_sets=dict()) # empty catalog
try:
cars_df = io.load('cars')
except DataSetError:
print("Error raised.")
AbstractDataSet¶
To understand what is going on behind the scenes, you should study the AbstractDataSet interface. AbstractDataSet
is the underlying interface that all datasets extend. It requires subclasses to override the _load
and _save
and provides load
and save
methods that enrich the corresponding private methods with uniform error handling. It also requires subclasses to override _describe
, which is used in logging the internal information about the instances of your custom AbstractDataSet
implementation.
If you have a dataset called parts
, you can make direct calls to it like so:
parts_df = parts.load()
However, we recommend using a DataCatalog
instead (for more details, see this section in the User Guide) as it has been designed to make all datasets available to project members.
For contributors, if you would like to submit a new dataset, you will have to extend AbstractDataSet
.
Versioning¶
In order to enable versioning, you need to update the catalog.yml
config file and set the versioned
attribute to true
for the given dataset. If this is a custom dataset, the implementation must also:
- extend
kedro.io.core.AbstractVersionedDataSet
AND - add
version
namedtuple as an argument to its__init__
method AND - call
super().__init__()
with positional argumentsfilepath
,version
, and, optionally, with aglob
and anexists
functions if it uses non-local filesystem (see kedro.io.CSVLocalDataSet and kedro.io.CSVS3DataSet for examples) AND - modify its
_describe
,_load
and_save
methods respectively to support versioning (see kedro.io.CSVLocalDataSet for an example implementation)
An example dataset could look similar to the below:
from pathlib import Path
import pandas as pd
from kedro.io import AbstractVersionedDataSet
class MyOwnDataSet(AbstractVersionedDataSet):
def __init__(self, param1, param2, filepath, version):
super().__init__(Path(filepath), version)
self._param1 = param1
self._param2 = param2
def _load(self) -> pd.DataFrame:
load_path = self._get_load_path()
return pd.read_csv(load_path)
def _save(self, df: pd.DataFrame) -> None:
save_path = self._get_save_path()
df.to_csv(save_path)
def _describe(self):
return dict(version=self._version, param1=self._param1, param2=self._param2)
With catalog.yml
specifying:
my_dataset:
type: <path-to-my-own-dataset>.MyOwnDataSet
filepath: data/01_raw/my_data.csv
versioned: true
version
namedtuple¶
Versioned dataset __init__
method must have an optional argument called version
with a default value of None
. If provided, this argument must be an instance of kedro.io.core.Version. Its load
and save
attributes must either be None
or contain string values representing exact load and save versions:
- If
version
isNone
then the dataset is considered not versioned. - If
version.load
isNone
then the latest available version will be used to load the dataset, otherwise a string representing exact load version must be provided. - If
version.save
isNone
then a new save version string will be generated by callingkedro.io.core.generate_timestamp()
, otherwise a string representing exact save version must be provided.
Versioning using the YAML API¶
The easiest way to version a specific dataset is to change the corresponding entry in the catalog.yml
.
Note:catalog.yml
only allows you to choose to version your datasets but it does not allow to choose which version to load or save. In rare case it is strongly required you may want to instantiate your versioned datasets using Code API and define version parameter explicitly (see the corresponding section below).
For example, if the following dataset was defined in the catalog.yml
:
cars.csv:
type: CSVLocalDataSet
filepath: data/01_raw/company/cars.csv
versioned: true
the DataCatalog
will create a versioned CSVLocalDataSet
called cars.csv
. The actual csv file location will look like data/01_raw/company/cars.csv/<version>/cars.csv
, where <version>
corresponds to a global save version string formatted as YYYY-MM-DDThh.mm.ss.sssZ
. Every time the DataCatalog
is instantiated, it generates a new global save version, which is propagated to all versioned datasets it contains.
Important: theDataCatalog
does not re-generate save versions between instantiations. Therefore, if you callcatalog.save('cars.csv', some_data)
twice, then the second call will fail, since it tries to overwrite a versioned dataset using the same save version. This limitation does not apply toload
operation.
By default, the DataCatalog
will load the latest version of the dataset. However, it is also possible to specify an exact load version. In order to do that, you can pass a dictionary with exact load versions to DataCatalog.from_config
:
load_versions = {'cars.csv': '2019-02-13T14.35.36.518Z'}
io = DataCatalog.from_config(catalog_config, credentials, load_versions=load_versions)
cars = io.load('cars.csv')
The last row in the example above would attempt to load a CSV file from data/01_raw/company/cars.csv/2019-02-13T14.35.36.518Z/cars.csv
.
load_versions
configuration has an effect only if a dataset versioning has been enabled in the catalog config file - see the example above.
Important: we recommend not to overridesave_version
argument inDataCatalog.from_config
unless strongly required to do so, since it may lead to inconsistencies between loaded and saved versions of the versioned datasets.
Versioning using the Code API¶
Although we recommend enabling versioning using the catalog.yml
config file as described in the section above, you may require more control over load and save versions of a specific dataset. To achieve this you can instantiate Version
and pass it as a parameter to the dataset initialisation:
from kedro.io import CSVLocalDataSet, DataCatalog, Version
import pandas as pd
data1 = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
data2 = pd.DataFrame({"col1": [7], "col2": [8], "col3": [9]})
version = Version(
load=None, # load the latest available version
save=None, # generate save version automatically on each save operation
)
test_data_set = CSVLocalDataSet(
filepath="data/01_raw/test.csv",
save_args={"index": False},
version=version,
)
io = DataCatalog({"test_data_set": test_data_set})
# save the dataset to data/01_raw/test.csv/<version>/test.csv
io.save("test_data_set", data1)
# save the dataset into a new file data/01_raw/test.csv/<version>/test.csv
io.save("test_data_set", data2)
# load the latest version from data/test.csv/*/test.csv
reloaded = io.load("test_data_set")
assert data2.equals(reloaded)
Note: In the example above we did not fix any versions. If we do, then the behaviour of load and save operations becomes slightly different:
version = Version(
load="my_exact_version", # load exact version
save="my_exact_version", # save to exact version
)
test_data_set = CSVLocalDataSet(
filepath="data/01_raw/test.csv",
save_args={"index": False},
version=version,
)
io = DataCatalog({"test_data_set": test_data_set})
# save the dataset to data/01_raw/test.csv/my_exact_version/test.csv
io.save("test_data_set", data1)
# load from data/01_raw/test.csv/my_exact_version/test.csv
reloaded = io.load("test_data_set")
assert data1.equals(reloaded)
# raises DataSetError since the path
# data/01_raw/test.csv/my_exact_version/test.csv already exists
io.save("test_data_set", data2)
Important: Passing exact load and/or save versions to the dataset instantiation is not recommended, since it may lead to inconsistencies between operations. For example, if versions for load and save operations do not match, save operation would result in aUserWarning
indicating that save a load versions do not match. Load after save may also return an error if the corresponding load version is not found:
version = Version(
load="exact_load_version", # load exact version
save="exact_save_version" # save to exact version
)
test_data_set = CSVLocalDataSet(
filepath="data/01_raw/test.csv",
save_args={"index": False},
version=version,
)
io = DataCatalog({"test_data_set": test_data_set})
io.save("test_data_set", data1) # emits a UserWarning due to version inconsistency
# raises DataSetError since the data/01_raw/test.csv/exact_load_version/test.csv
# file does not exist
reloaded = io.load("test_data_set")
Supported datasets¶
Currently the following datasets support versioning:
CSVLocalDataSet
CSVS3DataSet
HDFLocalDataSet
HDFS3DataSet
JSONLocalDataSet
JSONDataSet
ParquetLocalDataSet
PickleLocalDataSet
PickleS3DataSet
TextLocalDataSet
ExcelLocalDataSet
kedro.contrib.io.azure.CSVBlobDataSet
kedro.contrib.io.feather.FeatherLocalDataSet
kedro.contrib.io.networkx.NetworkXLocalDataSet
kedro.contrib.io.parquet.ParquetS3DataSet
kedro.contrib.io.pyspark.SparkDataSet
kedro.contrib.io.gcs.JSONGCSDataSet
kedro.contrib.io.gcs.CSVGCSDataSet
kedro.contrib.io.gcs.ParquetGCSDataSet
Partitioned dataset¶
These days distributed systems play an increasingly important role in ETL data pipelines. They significantly increase the processing throughput, enabling us to work with much larger volumes of input data. However, these benefits sometimes come at a cost. When dealing with the input data generated by such distributed systems, you may encounter a situation where your Kedro node needs to read the data from a directory full of uniform files of the same type (e.g. JSON, CSV, Parquet, etc.) rather than from a single file. Tools like PySpark
and the corresponding SparkDataSet cater for such use cases, but the use of Spark is not always feasible.
This is the reason why Kedro provides a built-in PartitionedDataset, which has the following features:
PartitionedDataset
can recursively load all or specific files from a given location.- Is platform agnostic and can work with any filesystem implementation supported by fsspec including local, S3, GCS, and many more.
- Implements a lazy loading approach and does not attempt to load any partition data until a processing node explicitly requests it.
Note: In this section each individual file inside a given location is called a partition.
Partitioned dataset definition¶
PartitionedDataset
definition can be put in your catalog.yml
like any other regular dataset definition; the definition represents the following structure:
# conf/base/catalog.yml
my_partitioned_dataset:
type: "PartitionedDataSet"
path: "s3://my-bucket-name/path/to/folder" # path to the location of partitions
dataset: "CSVS3DataSet" # shorthand notation for the dataset which will handle individual partitions
credentials: "my_credentials"
load_args:
load_arg1: "value1"
load_arg2: "value2"
Note: As any other datasetPartitionedDataset
can also be instantiated programmatically in Python:
from kedro.io import CSVS3DataSet, PartitionedDataSet
my_credentials = {...} # credentials dictionary
my_partitioned_dataset = PartitionedDataSet(
path="s3://my-bucket-name/path/to/folder",
dataset=CSVS3DataSet,
credentials=my_credentials,
load_args={"load_arg1": "value1", "load_arg2": "value2"}
)
Alternatively, if you need more granular configuration of the underlying dataset, its definition can be provided in full:
# conf/base/catalog.yml
my_partitioned_dataset:
type: "PartitionedDataSet"
path: "s3://my-bucket-name/path/to/folder"
dataset: # full dataset config notation
type: "kedro.io.csv_local.CSVS3DataSet" # supports any importable fully qualified class path
load_args:
delimiter: ","
save_args:
index: false
credentials: "my_credentials"
load_args:
load_arg1: "value1"
load_arg2: "value2"
filepath_arg: "filepath" # the argument of the dataset to pass the filepath to
filename_suffix: ".csv"
Here is an exhaustive list of the arguments supported by PartitionedDataSet
:
| Argument | Required | Supported types | Description |
| :——: | :——: | :————-: | ———– |
| path
| Yes | str
| Path to the folder containing partitioned data. If path starts with the protocol (e.g., s3://
) then the corresponding fsspec
concrete filesystem implementation will be used. If protocol is not specified, local filesystem will be used |
| dataset
| Yes |str
, Type[AbstractDataSet]
, Dict[str, Any]
| Underlying dataset definition, for more details see the section below |
| credentials
| No | Dict[str, Any]
| Protocol-specific options that will be passed to fsspec.filesystem
call, for more details see the section below |
| load_args
| No | Dict[str, Any]
| Keyword arguments to be passed into find()
method of the corresponding filesystem implementation |
| filepath_arg
| No
(defauls to filepath
) | str
| Argument name of the underlying dataset initializer that will contain a path to an individual partition |
| filename_suffix
| No
(defauls to an empty string) | str
| If specified, partitions that don’t end with this string will be ignored |
Dataset definition¶
Dataset definition should be passed into the dataset
argument of the PartitionedDataSet
. The dataset definition is used to instantiate a new dataset object for each individual partition, and use that dataset object for load and save operations. Dataset definition supports shorthand and full notations.
Shorthand notation¶
Requires you to only specify a class of the underlying dataset either as a string (e.g. CSVS3DataSet
or a fully qualified class path like kedro.io.csv_local.CSVS3DataSet
) or as a class object that is a subclass of the AbstractDataSet.
Full notation¶
Full notation allows you to specify a dictionary with the full underlying dataset definition except the following arguments:
- The argument that receives the partition path (
filepath
by default) - if specified, aUserWarning
will be emitted stating that this value will be overridden by individual partition paths credentials
key - specifying it will result inDataSetError
being raised; dataset credentials should be passed intocredentials
argument of thePartitionedDataSet
rather than underlying dataset definition - see the section below for detailsversioned
flag - specifying it will result inDataSetError
being raised; versioning cannot be enabled for the underlying datasets
Partitioned dataset credentials¶
Credentials dictionary is special in a sense that it may contain credentials for both PartitionedDataSet
itself and the underlying dataset that is used for partition load and save. Here is the full list of possible scenarios:
| Scenario | Example credentials
dictionary | Description |
| :——: | :——————————: | ———– |
| credentials
is None
or an empty dictionary | None
| Credentials are not passed to the underlying dataset or the filesystem |
| credentials
dictionary does not have dataset_credentials
key | {"foo": "bar"}
| The whole contents of credentials
dictionary is passed to both the underlying dataset (CSVS3DataSet(..., credentials={"foo": "bar"})
) and the filesystem (fsspec.filesystem(..., foo="bar")
) |
| credentials
dictionary has a non-empty dataset_credentials
key | {"foo": "bar", "dataset_credentials": {"baz": "qux"}}
| The contents of the dataset_credentials
key is passed to the dataset: CSVS3DataSet(..., credentials={"baz": "qux"})
, all other keys (if any) are passed to the filesystem: fsspec.filesystem(..., foo="bar")
|
| credentials
dictionary has an empty dataset_credentials
key | {"foo": "bar", "dataset_credentials": None}
| No credentials are passed to the dataset, all keys except dataset_credentials
are passed to the filesystem: fsspec.filesystem(..., foo="bar")
|
Partitioned dataset load¶
Let’s assume that the Kedro pipeline that you are working with contains the node defined as follows:
from kedro.pipeline import node
node(concat_partitions, inputs="my_partitioned_dataset", outputs="concatenated_result")
The underlying node function concat_partitions
may look like this:
from typing import Any, Callable, Dict
import pandas as pd
def concat_partitions(partitioned_input: Dict[str, Callable[[], Any]]) -> pd.DataFrame:
"""Concatenate input partitions into one pandas DataFrame.
Args:
partitioned_input: A dictionary with partition ids as keys and load functions as values.
Returns:
Pandas DataFrame representing a concatenation of all loaded partitions.
"""
result = pd.DataFrame()
for partition_key, partition_load_func in sorted(partitioned_input.items()):
partition_data = partition_load_func() # load the actual partition data
result = pd.concat([result, partition_data], ignore_index=True, sort=True) # concat with existing result
return result
As you can see from the example above, on load PartitionedDataSet
does not automatically load the data from the located partitions. Instead, PartitionedDataSet
returns a dictionary with partition IDs as keys and the corresponding load functions as values. It allows the node that consumes the PartitionedDataSet
to implement the logic that defines what partitions need to be loaded and how this data is going to be processed.
Note: Partition ID does not represent the whole partition path, but only a part of it that is unique for a given partition and filename suffix:
Example 1: if
path="s3://my-bucket-name/folder"
and partition is stored ins3://my-bucket-name/folder/2019-12-04/data.csv
then its Partition ID is2019-12-04/data.csv
.Example 2: if
path="s3://my-bucket-name/folder"
andfilename_suffix=".csv"
and partition is stored ins3://my-bucket-name/folder/2019-12-04/data.csv
then its Partition ID is2019-12-04/data
.
Note:PartitionedDataSet
implements caching on load operation, which means that if multiple nodes consume the samePartitionedDataSet
, they will all receive the same partition dictionary even if some new partitions were added to the folder after the first load has been completed. This is done deliberately to guarantee the consistency of load operations between the nodes and avoid race conditions. You can reset cache by calling.invalidate_cache()
method of the partitioned dataset object.
Partitioned dataset save¶
PartitionedDataSet
also supports a save operation. Let’s assume the following configuration:
# conf/base/catalog.yml
new_partitioned_dataset:
type: "PartitionedDataSet"
path: "s3://my-bucket-name"
dataset: "CSVS3DataSet"
filename_suffix: ".csv"
node definition:
from kedro.pipeline import node
node(create_partitions, inputs=None, outputs="new_partitioned_dataset")
and underlying node function create_partitions
:
from typing import Any, Dict
import pandas as pd
def create_partitions() -> Dict[str, Any]:
"""Create new partitions and save using PartitionedDataSet.
Returns:
Dictionary with the partitions to create.
"""
return {
"part/foo": pd.DataFrame({"data": [1, 2]}), # create a file "s3://my-bucket-name/part/foo.csv"
"part/bar.csv": pd.DataFrame({"data": [3, 4]}), # create a file "s3://my-bucket-name/part/bar.csv.csv"
}
Note: Writing to an existing partition may result in its data being overwritten, if this case is not specifically handled by the underlying dataset implementation. You should implement your own checks to ensure that no existing data is lost when writing to aPartitionedDataSet
. The simplest safety mechanism could be to use partition IDs that have a high chance of uniqueness - for example, the current timestamp.