Source code for ayx_python_sdk.providers.amp_provider.repositories.dcm_repository

# Copyright (C) 2022 Alteryx, Inc. All rights reserved.
#
# Licensed under the ALTERYX SDK AND API LICENSE AGREEMENT;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    https://www.alteryx.com/alteryx-sdk-and-api-license-agreement
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class that saves and retrieves AMP DCM information."""
import datetime as dt
import logging
from typing import Dict, Optional

from ayx_python_sdk.core.exceptions import DcmEError, DcmEException
from ayx_python_sdk.providers.amp_provider.repositories.singleton import Singleton
from ayx_python_sdk.providers.amp_provider.resources.generated.dcm_e_pb2 import (
    DcmERequest,
    DcmEResponse,
)

from google.protobuf import json_format

import grpc

logger = logging.getLogger(__name__)


[docs]class DCMRepository(metaclass=Singleton): """Repository that stores DCM information.""" from ayx_python_sdk.providers.amp_provider.resources.generated.sdk_engine_service_pb2_grpc import ( SdkEngineStub, ) @staticmethod def __get_grpc_client() -> "SdkEngineStub": """ Get grpc sdk engine client. Parameters ---------- - """ from ayx_python_sdk.providers.amp_provider.repositories.grpc_repository import ( GrpcRepository, ) try: return GrpcRepository().get_sdk_engine_client() except ValueError: raise DcmEException( "Error getting GRPC client!", DcmEError("Error getting GRPC client!", 10001), ) @staticmethod def __handle_response(res: DcmEResponse) -> Dict: """Handle DcmE response.""" response_dict = json_format.MessageToDict(res.response) if not res.success: if ( "message" in response_dict.keys() and "errorCode" in response_dict.keys() ): if "detail" in response_dict.keys(): raise DcmEException( "Dcm.E Error!", DcmEError( response_dict["message"], int(response_dict["errorCode"]), response_dict["detail"], ), ) else: raise DcmEException( "Dcm.E Error!", DcmEError( response_dict["message"], int(response_dict["errorCode"]) ), ) else: raise DcmEException("Dcm.E Error!", DcmEError(res.response, 10003)) return response_dict
[docs] @staticmethod def get_connection(connection_id: str) -> Dict: """ Retrieve connection information including secrets by connection ID. Parameters ---------- connection_id string with UUID of connection """ logger.debug("Getting DCM connection for " + str(connection_id)) client = DCMRepository.__get_grpc_client() try: req = DcmERequest() req.v2.get_connection.connection_id = connection_id res = client.Dcm(req) except grpc.RpcError: raise DcmEException( "Error getting DCM connection!", DcmEError("Error getting DCM connection!"), 10002, ) return DCMRepository.__handle_response(res)
[docs] @staticmethod def update_connection_secret( connection_id: str, lock_id: str, role: str, secret_type: str, value: str, expires_on: Optional[dt.datetime], parameters: Optional[Dict[str, str]], ) -> Dict: """ Update a single secret for role and secret_type to value as well as the optional expires_on and parameters. Parameters ---------- connection_id A connection ID lock_id A lock ID acquired from get_write_lock() role A role such as ?oauth? secret_type A secret type such as ?oauth_token? value The new value to store for the secret expires_on (Optional) DateTime expiration of this secret parameters Dict of parameter values for this secret (this is arbitrary user data stored as JSON) """ from google.protobuf.struct_pb2 import Struct logger.debug("Updating DCM connection secret for " + str(connection_id)) client = DCMRepository.__get_grpc_client() try: req = DcmERequest() req.v2.update_secret.connection_id = connection_id req.v2.update_secret.lock_id = lock_id req.v2.update_secret.credential_role = role req.v2.update_secret.secret_type = secret_type req.v2.update_secret.value = value if parameters: st = Struct() st.update(parameters) req.v2.update_secret.parameters.CopyFrom(st) if expires_on: req.v2.update_secret.expires_on = expires_on.isoformat() res = client.Dcm(req) except grpc.RpcError: raise DcmEException( "Error getting DCM connection!", DcmEError("Error getting DCM connection!"), 10002, ) return DCMRepository.__handle_response(res)
[docs] @staticmethod def get_write_lock( connection_id: str, role: str, secret_type: str, expires_in: Optional[dt.datetime], ) -> Dict: """ Attempt to acquire an exclusive write lock. Parameters ---------- connection_id string with UUID of connection role A role such as ?oauth? secret_type A secret type such as ?oauth_token? expires_in (Optional) A DateTime value in which to ask for the lock to be held for in milliseconds. """ logger.debug("Getting DCM write lock for " + str(connection_id)) client = DCMRepository.__get_grpc_client() try: req = DcmERequest() req.v2.lock_secret.connection_id = connection_id req.v2.lock_secret.credential_role = role req.v2.lock_secret.secret_type = secret_type if expires_in: req.v2.lock_secret.expires_in = expires_in.isoformat() res = client.Dcm(req) except grpc.RpcError: raise DcmEException( "Error getting DCM connection!", DcmEError("Error getting DCM connection!"), 10002, ) return DCMRepository.__handle_response(res)
[docs] @staticmethod def free_write_lock( connection_id: str, role: str, secret_type: str, lock_id: str ) -> None: """ Frees a lock obtained from a previous call to get_write_lock(). Parameters ---------- connection_id string with UUID of connection role A role such as ?oauth? secret_type A secret type such as ?oauth_token? lock_id A lock_id acquired from a previous call to get_write_lock() """ logger.debug( "Freeing DCM write lock for " + str(connection_id) + " lock_id: " + str(lock_id) ) client = DCMRepository.__get_grpc_client() try: req = DcmERequest() req.v2.unlock_secret.connection_id = connection_id req.v2.unlock_secret.credential_role = role req.v2.unlock_secret.secret_type = secret_type req.v2.unlock_secret.lock_id = lock_id res = client.Dcm(req) except grpc.RpcError: raise DcmEException( "Error getting DCM connection!", DcmEError("Error getting DCM connection!"), 10002, ) DCMRepository.__handle_response(res)