Source code for kedro.io.partitioned_data_set

# Copyright 2018-2019 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
#     or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.

"""``PartitionedDataSet`` loads and saves partitioned file-like data using the
underlying dataset definition. It also uses `fsspec` for filesystem level operations.
"""

from copy import deepcopy
from functools import lru_cache
from typing import Any, Callable, Dict, List, Tuple, Type, Union
from warnings import warn

import fsspec
from fsspec.utils import infer_storage_options

from kedro.io.core import (
    VERSION_KEY,
    VERSIONED_FLAG_KEY,
    AbstractDataSet,
    DataSetError,
    parse_dataset_definition,
)
from kedro.io.data_catalog import CREDENTIALS_KEY

DATASET_CREDENTIALS_KEY = "dataset_credentials"

S3_PROTOCOLS = ("s3", "s3a", "s3n")


[docs]class PartitionedDataSet(AbstractDataSet): # pylint: disable=too-many-instance-attributes """``PartitionedDataSet`` loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses `fsspec`: https://github.com/intake/filesystem_spec. Example: :: >>> import pandas as pd >>> from kedro.io import PartitionedDataSet >>> >>> credentials = { >>> "key1": "secret1", # will be passed to 'fsspec.filesystem()' call >>> "dataset_credentials": { # will be passed to the dataset initializer >>> "key2": "secret2", >>> "key3": "secret3" >>> } >>> } >>> >>> data_set = PartitionedDataSet( >>> path="s3://bucket-name/path/to/folder", >>> dataset="CSVS3DataSet", >>> credentials=credentials >>> ) >>> loaded = data_set.load() >>> # assert isinstance(loaded, dict) >>> >>> combine_all = pd.DataFrame() >>> >>> for partition_id, partition_load_func in loaded.items(): >>> partition_data = partition_load_func() >>> combine_all = pd.concat( >>> [combine_all, partition_data], ignore_index=True, sort=True >>> ) >>> >>> new_data = pd.DataFrame({"new": [1, 2]}) >>> # creates "s3://bucket-name/path/to/folder/new/partition.csv" >>> data_set.save({"new/partition.csv": new_data}) >>> """
[docs] def __init__( # pylint: disable=too-many-arguments self, path: str, dataset: Union[str, Type[AbstractDataSet], Dict[str, Any]], filepath_arg: str = "filepath", filename_suffix: str = "", credentials: Dict[str, Any] = None, load_args: Dict[str, Any] = None, ): """Creates a new instance of ``PartitionedDataSet``. Args: path: Path to the folder containing partitioned data. If path starts with the protocol (e.g., ``s3://``) then the corresponding ``fsspec`` concrete filesystem implementation will be used. If protocol is not specified, ``fsspec.implementations.local.LocalFileSystem`` will be used. **Note:** Some concrete implementations are bundled with ``fsspec``, while others (like ``s3`` or ``gcs``) must be installed separately prior to usage of the ``PartitionedDataSet``. dataset: Underlying dataset definition. This is used to instantiate the dataset for each file located inside the ``path``. Accepted formats are: a) object of a class that inherits from ``AbstractDataSet`` b) a string representing a fully qualified class name to such class c) a dictionary with ``type`` key pointing to a string from b), other keys are passed to the Dataset initializer. **Note:** Credentials resolution is *not* currently supported for the underlying dataset definition. filepath_arg: Underlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to "filepath". filename_suffix: If specified, only partitions that end with this string will be processed. credentials: Protocol-specific options that will be passed to ``fsspec.filesystem`` call: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem _and_ also to the underlying dataset initializer. If ``dataset_credentials`` key is present in this dictionary, then only its value will be passed to the dataset initializer ``credentials`` argument instead of the copy of the entire dictionary. Example 1: If ``credentials = {"k1": "secret1"}``, then filesystem is called as ``filesystem(..., k1="secret1")``, the dataset is instantiated as ``dataset_class(..., credentials={"k1": "secret1"})``. Example 2: If ``credentials = {"k1": "secret1", "dataset_credentials": {"k2": "secret2"}}``, then filesystem is called as ``filesystem(..., k1="secret1")``, the dataset is instantiated as ``dataset_class(..., credentials={"k2": "secret2"})``. Example 3: If ``credentials = {"dataset_credentials": {"k2": "secret2"}}``, then credentials are not passed to the filesystem call, the dataset is instantiated as ``dataset_class(..., credentials={"k2": "secret2"})``. Example 4: If ``credentials = {"k1": "secret1", "dataset_credentials": None}``, then filesystem is called as ``filesystem(..., k1="secret1")``, credentials are not passed to the dataset initializer. load_args: Keyword arguments to be passed into ``find()`` method of the filesystem implementation. Raises: DataSetError: If versioning is enabled for the underlying dataset. """ super().__init__() self._path = path self._filename_suffix = filename_suffix self._protocol = infer_storage_options(self._path)["protocol"] dataset = dataset if isinstance(dataset, dict) else {"type": dataset} self._dataset_type, self._dataset_config = parse_dataset_definition(dataset) if VERSION_KEY in self._dataset_config: raise DataSetError( "`{}` does not support versioning of the underlying dataset. " "Please remove `{}` flag from the dataset definition.".format( self.__class__.__name__, VERSIONED_FLAG_KEY ) ) if CREDENTIALS_KEY in self._dataset_config: raise DataSetError( "Credentials for the underlying dataset must not be specified " "explicitly in dataset configuration. Please put those under " "`dataset_credentials` key in a dictionary and pass as " "`credentials` argument to {} initializer.".format( self.__class__.__name__ ) ) self._credentials, dataset_credentials = _split_credentials(credentials) if dataset_credentials: self._dataset_config[CREDENTIALS_KEY] = dataset_credentials self._filepath_arg = filepath_arg if self._filepath_arg in self._dataset_config: warn( "`{}` key must not be specified in the dataset definition as it " "will be overwritten by partition path".format(self._filepath_arg) ) self._load_args = deepcopy(load_args) or {} self._sep = self._filesystem.sep # since some filesystem implementations may implement a global cache self.invalidate_cache()
@property def _filesystem(self) -> fsspec.AbstractFileSystem: protocol = "s3" if self._protocol in S3_PROTOCOLS else self._protocol return fsspec.filesystem(protocol, **self._credentials) @lru_cache(maxsize=None) def _list_partitions(self) -> List[str]: return [ path for path in self._filesystem.find(self._path, **self._load_args) if path.endswith(self._filename_suffix) ] def _join_protocol(self, path: str) -> str: if self._path.startswith(self._protocol) and not path.startswith( self._protocol ): return "{}://{}".format(self._protocol, path) return path def _partition_to_path(self, path: str): dir_path = self._path.rstrip(self._sep) path = path.lstrip(self._sep) full_path = self._sep.join([dir_path, path]) + self._filename_suffix return full_path def _path_to_partition(self, path: str) -> str: dir_path = self._filesystem._strip_protocol( # pylint: disable=protected-access self._path ) path = path.split(dir_path, 1).pop().lstrip(self._sep) if self._filename_suffix and path.endswith(self._filename_suffix): path = path[: -len(self._filename_suffix)] return path def _load(self) -> Dict[str, Callable[[], Any]]: partitions = {} for partition in self._list_partitions(): kwargs = deepcopy(self._dataset_config) # join the protocol back since PySpark may rely on it kwargs[self._filepath_arg] = self._join_protocol(partition) dataset = self._dataset_type(**kwargs) # type: ignore partition_id = self._path_to_partition(partition) partitions[partition_id] = dataset.load if not partitions: raise DataSetError("No partitions found in `{}`".format(self._path)) return partitions def _save(self, data: Dict[str, Any]) -> None: for partition_id, partition_data in sorted(data.items()): kwargs = deepcopy(self._dataset_config) partition = self._partition_to_path(partition_id) # join the protocol back since tools like PySpark may rely on it kwargs[self._filepath_arg] = self._join_protocol(partition) dataset = self._dataset_type(**kwargs) # type: ignore dataset.save(partition_data) self.invalidate_cache() def _describe(self) -> Dict[str, Any]: clean_dataset_config = ( {k: v for k, v in self._dataset_config.items() if k != CREDENTIALS_KEY} if isinstance(self._dataset_config, dict) else self._dataset_config ) return dict( path=self._path, dataset_type=self._dataset_type.__name__, dataset_config=clean_dataset_config, )
[docs] def invalidate_cache(self): """Invalidate `_list_partitions` method and underlying filesystem caches.""" self._list_partitions.cache_clear() self._filesystem.invalidate_cache(self._path)
def _exists(self) -> bool: return bool(self._list_partitions()) def _release(self) -> None: self.invalidate_cache()
def _split_credentials( credentials: Union[Dict[str, Any], None] ) -> Tuple[Dict[str, Any], Any]: credentials = deepcopy(credentials) or {} dataset_credentials = credentials.pop( DATASET_CREDENTIALS_KEY, deepcopy(credentials) ) return credentials, dataset_credentials