Source code for ayx_plugin_sdk.tracer_bullet.tracer_bullet_plugin

# Copyright (C) 2020 Alteryx, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example pass through tool."""
from ayx_plugin_sdk.core import (
    InputConnectionBase,
    Plugin,
    ProviderBase,
    register_plugin,
)


[docs]class TracerBulletPlugin(Plugin): """A sample Plugin that passes data from an input connection to an output connection.""" def __init__(self, provider: ProviderBase): """Construct the AyxRecordProcessor.""" self.config = provider.tool_config self.provider = provider self.output_anchor = self.provider.get_output_anchor("Output") translated_msg = self.provider.io.translate_msg("Tracer Bullet Started") self.provider.io.info(translated_msg) if self.provider.environment.update_only: self.provider.io.info( f"Update mode: {self.provider.environment.update_mode}" ) self.temp_file = self.provider.io.create_temp_file(extension="tmp") self.provider.io.update_progress(10) self.provider.io.info(f"Tool ID: {self.provider.environment.tool_id}") self.provider.io.info( f"Designer version: {self.provider.environment.designer_version}" ) self.provider.io.info( f"Alteryx Install Dir: {self.provider.environment.alteryx_install_dir}" ) self.provider.io.info( f"Alteryx Locale: {self.provider.environment.alteryx_locale}" ) self.provider.io.info( f"Workflow directory: {self.provider.environment.workflow_dir}" )
[docs] def on_input_connection_opened(self, input_connection: InputConnectionBase) -> None: """Initialize the Input Connections of this plugin.""" if input_connection.metadata is None: raise RuntimeError("Metadata should be set here.") input_connection.max_packet_size = 100 self.output_anchor.open(input_connection.metadata.clone())
[docs] def on_record_packet(self, input_connection: InputConnectionBase) -> None: """Handle the record packet received through the input connection.""" packet = input_connection.read() self.output_anchor.write(packet)
[docs] def on_complete(self) -> None: """Handle for when the plugin is complete.""" self.provider.io.info("Tracer bullet complete.") self.provider.io.info(f"Temp file was created at: {self.temp_file}") self.provider.io.warn("This is a test warning.") self.config["New Tag"] = 42 self.provider.environment.update_tool_config(self.config)
AyxPlugin = register_plugin(TracerBulletPlugin)