Source code for kedro.io.pickle_s3

# Copyright 2018-2019 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
#     or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.

"""``PickleS3DataSet`` loads and saves a Python object to a pickle file on S3.
The underlying functionality is supported by the ``pickle`` library, so
it supports all allowed options for loading and saving pickle files.
"""
import pickle
from typing import Any, Dict, Optional

from s3fs.core import S3FileSystem

from kedro.io.core import AbstractDataSet, DataSetError, S3PathVersionMixIn, Version


[docs]class PickleS3DataSet(AbstractDataSet, S3PathVersionMixIn): """``PickleS3DataSet`` loads and saves a Python object to a pickle file on S3. The underlying functionality is supported by the pickle library, so it supports all allowed options for loading and saving pickle files. Example: :: >>> from kedro.io import PickleLocalDataSet >>> import pandas as pd >>> >>> dummy_data = pd.DataFrame({'col1': [1, 2], >>> 'col2': [4, 5], >>> 'col3': [5, 6]}) >>> data_set = PickleS3DataSet(filepath="data.pkl", >>> bucket_name="test_bucket", >>> load_args=None, >>> save_args=None) >>> data_set.save(dummy_data) >>> reloaded = data_set.load() """ # pylint: disable=too-many-arguments
[docs] def __init__( self, filepath: str, bucket_name: str, credentials: Optional[Dict[str, Any]] = None, load_args: Optional[Dict[str, Any]] = None, save_args: Optional[Dict[str, Any]] = None, version: Version = None, ) -> None: """Creates a new instance of ``PickleS3DataSet`` pointing to a concrete file on S3. ``PickleS3DataSet`` uses pickle backend to serialise objects to disk: pickle.dumps: https://docs.python.org/3/library/pickle.html#pickle.dumps and to load serialised objects into memory: pickle.loads: https://docs.python.org/3/library/pickle.html#pickle.loads Args: filepath: path to a pkl file. bucket_name: S3 bucket name. credentials: Credentials to access the S3 bucket, such as ``aws_access_key_id``, ``aws_secret_access_key``. load_args: Options for loading pickle files. Refer to the help file of ``pickle.loads`` for options. save_args: Options for saving pickle files. Refer to the help file of ``pickle.dumps`` for options. version: If specified, should be an instance of ``kedro.io.core.Version``. If its ``load`` attribute is None, the latest version will be loaded. If its ``save`` attribute is None, save version will be autogenerated. """ default_load_args = {} default_save_args = {} self._filepath = filepath self._bucket_name = bucket_name self._credentials = credentials if credentials else {} self._version = version self._load_args = ( {**default_load_args, **load_args} if load_args is not None else default_load_args ) self._save_args = ( {**default_save_args, **save_args} if save_args is not None else default_save_args ) self._s3 = S3FileSystem(client_kwargs=self._credentials)
@property def _client(self): return self._s3.s3 def _describe(self) -> Dict[str, Any]: return dict( filepath=self._filepath, bucket_name=self._bucket_name, load_args=self._load_args, save_args=self._save_args, version=self._version, ) def _load(self) -> Any: load_key = self._get_load_path( self._client, self._bucket_name, self._filepath, self._version ) with self._s3.open( "{}/{}".format(self._bucket_name, load_key), mode="rb" ) as s3_file: return pickle.loads(s3_file.read(), **self._load_args) def _save(self, data: Any) -> None: save_key = self._get_save_path( self._client, self._bucket_name, self._filepath, self._version ) bytes_object = pickle.dumps(data, **self._save_args) with self._s3.open( "{}/{}".format(self._bucket_name, save_key), mode="wb" ) as s3_file: s3_file.write(bytes_object) load_key = self._get_load_path( self._client, self._bucket_name, self._filepath, self._version ) self._check_paths_consistency(load_key, save_key) def _exists(self) -> bool: try: load_key = self._get_load_path( self._client, self._bucket_name, self._filepath, self._version ) except DataSetError: return False args = (self._client, self._bucket_name, load_key) return any(key == load_key for key in self._list_objects(*args))