Source code for ayx_python_sdk.providers.amp_provider.builders.record_packet_builder

# Copyright (C) 2022 Alteryx, Inc. All rights reserved.
#
# Licensed under the ALTERYX SDK AND API LICENSE AGREEMENT;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    https://www.alteryx.com/alteryx-sdk-and-api-license-agreement
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Record packet building utilities for converting between core and protobuf."""
from typing import List, Tuple

from ayx_python_sdk.core.constants import NULL_VALUE_PLACEHOLDER
from ayx_python_sdk.core.metadata import Metadata
from ayx_python_sdk.core.record_packet_base import RecordPacketBase
from ayx_python_sdk.providers.amp_provider.amp_record_packet import AMPRecordPacket
from ayx_python_sdk.providers.amp_provider.builders.record_builder import RecordBuilder
from ayx_python_sdk.providers.amp_provider.resources.generated.record_packet_pb2 import (
    RecordPacket as ProtobufRecordPacket,
)


[docs]class RecordPacketBuilder: """Utilities for converting record packets between protobuf and core objects.""" record_builder = RecordBuilder()
[docs] @classmethod def from_protobuf( cls, protobuf_record_packet: ProtobufRecordPacket, metadata: Metadata ) -> Tuple[RecordPacketBase, int, float]: """ Convert a protobuf to a record packet. Parameters ---------- protobuf_record_packet Protobuf representation of a record packet. metadata Metadata associated with the record packet. Returns ------- Tuple[RecordPacketBase, int, float] AMPRecordPacket representation of the protobuf record packet, sequence, and progress """ import pandas as pd parsed_records = [ cls.record_builder.from_protobuf(protobuf_record) for protobuf_record in protobuf_record_packet.records ] return ( AMPRecordPacket( metadata=metadata, df=pd.DataFrame( parsed_records, columns=[field.name for field in metadata] ), ), protobuf_record_packet.sequence, protobuf_record_packet.progress, )
[docs] @classmethod def to_protobuf( cls, amp_record_packet: RecordPacketBase, sequence: int, progress: float ) -> List["ProtobufRecordPacket"]: """ Convert a record packet to a protobuf. Parameters ---------- amp_record_packet Protobuf representation of a record packet. sequence Position that the current record packet would be in, out of all record packets. (First, second, third, etc.) progress Overall progress (ranging from 0.0 to 1.0) Returns ------- List[ProtobufRecordPacket] A list of ProtobufRecordPackets. If the passed in AMPRecordPacket would surpass 64MB, then there will be more than one element in list """ df = amp_record_packet.to_dataframe() df.fillna(NULL_VALUE_PLACEHOLDER, inplace=True) metadata = amp_record_packet.metadata protobuf_records = [ cls.record_builder.to_protobuf(record, metadata) for record in df.itertuples(index=False) ] size_limit = 64 * (2 ** 20) # 64 MB curr_size = 0 pb_record_packets = [] current_records = [] for record in protobuf_records: record_size = len(record.data) if curr_size + record_size > size_limit: pb_record_packets.append( ProtobufRecordPacket( sequence=sequence, progress=progress, records=current_records, ) ) sequence += 1 current_records = [] curr_size = 0 current_records.append(record) curr_size += record_size if current_records: pb_record_packets.append( ProtobufRecordPacket( sequence=sequence, progress=progress, records=current_records, ) ) return pb_record_packets