Source code for ayx_python_sdk.providers.amp_provider.sdk_tool_service
# Copyright (C) 2022 Alteryx, Inc. All rights reserved.
#
# Licensed under the ALTERYX SDK AND API LICENSE AGREEMENT;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.alteryx.com/alteryx-sdk-and-api-license-agreement
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SDK Engine Service grpc class."""
import logging
import traceback
from typing import Any, Callable
from ayx_python_sdk.providers.amp_provider import AMPDriver, AMPProvider
from ayx_python_sdk.providers.amp_provider.repositories import (
EnvironmentRepository,
GrpcRepository,
InputAnchorRepository,
InputMetadataRepository,
InputRecordPacketRepository,
OutputAnchorRepository,
PluginClassRepository,
ToolConfigRepository,
clear_repositories,
)
from ayx_python_sdk.providers.amp_provider.resources.generated.sdk_tool_service_pb2_grpc import (
SdkToolServicer,
)
from ayx_python_sdk.providers.amp_provider.resources.generated.transport_pb2 import (
ReturnStatus,
)
import grpc
logger = logging.getLogger()
def _handle_service_exception(method: Callable) -> Callable:
def _handle_exception(obj, request, context) -> Any: # type: ignore
try:
return method(obj, request, context)
except Exception:
traceback_str = traceback.format_exc()
logger.exception(traceback_str)
context.set_details(traceback_str)
context.set_code(grpc.StatusCode.INTERNAL)
context.abort(code=grpc.StatusCode.INTERNAL, details=traceback_str)
return _handle_exception
[docs]class SdkToolService(SdkToolServicer):
"""SDK Tool Service GRPC overrides."""
[docs] @_handle_service_exception
def ConfirmSdkToolServiceConnection(self, request, context): # type: ignore # noqa: N802
"""Confirm healthy connection.
Parameters
----------
request: Empty
The incoming gRPC request - should be empty for this method
context: grpc.ServicerContext
Service-level context for this gRPC service
"""
logger.debug("ConfirmSdkToolServiceConnection called.")
status = ReturnStatus(message=f"Connection successful!", success=True)
return status
[docs] @_handle_service_exception
def InitializeSdkPlugin(self, request, context): # type: ignore # noqa: N802
"""
Initialize SDK Plugin with config, anchors, and connections.
Parameters
----------
request: PluginInitializationData
The incoming gRPC request - should contain tool config, engine constants, list of incoming and outgoing anchors, and a boolean updataMode.
context: grpc.ServicerContext
Service-level context for this gRPC service
"""
logger.debug("InitializeSdkPlugin called.")
AMPDriver().clear_state()
clear_repositories(exclude={GrpcRepository(), PluginClassRepository()})
ToolConfigRepository().save_xml_tool_config(request.configXml)
EnvironmentRepository().save_engine_constants(dict(request.engineConstants))
EnvironmentRepository().save_update_mode(request.updateMode)
for input_anchor in request.incomingAnchors:
logger.debug("Saving input anchor: \n%s\n", input_anchor)
InputAnchorRepository().save_grpc_anchor(input_anchor)
for output_anchor in request.outgoingAnchors:
logger.debug("Saving output anchor: \n%s\n", output_anchor)
OutputAnchorRepository().save_grpc_anchor(output_anchor)
amp_provider = AMPProvider()
sdk_plugin = PluginClassRepository().get_plugin_class()(amp_provider)
AMPDriver().plugin = sdk_plugin
for anchor in request.incomingAnchors:
for connection in anchor.connections:
logger.debug(
"Driving metadata for input anchor %s and connection %s.",
anchor.name,
connection.name,
)
AMPDriver().metadata_received(anchor.name, connection.name)
status = ReturnStatus(message=f"Initialization successful!", success=True)
return status
[docs] @_handle_service_exception
def PushIncomingRecordPacket(self, request, context): # type: ignore # noqa: N802
"""
Push a record packet to the plugin.
Parameters
----------
request: IncomingRecordPacketPush
The incoming gRPC request - should contain an anchor name, connection name, and the record packet to send.
context: grpc.ServicerContext
Service-level context for this gRPC service.
"""
logger.debug("PushIncomingRecordPacket called.")
logger.debug(
"Packet received on anchor %s and connection %s.",
request.anchor_name,
request.connection_name,
)
record_packet_metadata = InputMetadataRepository().get_metadata(
request.anchor_name, request.connection_name
)
InputRecordPacketRepository().save_grpc_record_packet(
anchor_name=request.anchor_name,
connection_name=request.connection_name,
grpc_record_packet=request.record_packet,
metadata=record_packet_metadata,
)
logger.debug(
"Driving record packet to plugin on anchor %s and connection %s.",
request.anchor_name,
request.connection_name,
)
AMPDriver().record_packet_received(
anchor_name=request.anchor_name,
connection_name=request.connection_name,
)
status = ReturnStatus(message="Record packet recieved!", success=True)
return status
[docs] @_handle_service_exception
def NotifyIncomingConnectionComplete(self, request, context): # type: ignore # noqa: N802
"""
Notify the plugin that a connection has closed.
Parameters
----------
request: IncomingConnectionComplete
The incoming gRPC request - should contain an anchor name and connection name.
context: grpc.ServicerContext
Service-level context for this gRPC service
"""
logger.debug("NotifyIncomingConnectionComplete called.")
AMPDriver().connection_closed_callback(
request.anchor_name, request.connection_name
)
logger.debug(
"Connection closed for anchor %s, connection %s",
request.anchor_name,
request.connection_name,
)
status = ReturnStatus(
message=f"Connection {request.connection_name} closed!", success=True
)
return status
[docs] @_handle_service_exception
def NotifyPluginComplete(self, request, context): # type: ignore # noqa: N802
"""
Notify the plugin that the on_complete method should be called.
Parameters
----------
request: Empty
The incoming gRPC request - should be empty for this method
context: grpc.ServicerContext
Service-level context for this gRPC service
"""
logger.debug("NotifyPluginComplete Called")
AMPDriver().complete_callback()
status = ReturnStatus(message=f"Plugin process complete!", success=True)
return status