Source code for ydb.types

# -*- coding: utf-8 -*-
from __future__ import annotations

import abc
from dataclasses import dataclass
import enum
import json
from . import _utilities, _apis
from datetime import date, datetime, timedelta, timezone
import typing
import uuid
import struct
from google.protobuf import struct_pb2

from . import table


# Workaround for good IDE and universal for runtime
if typing.TYPE_CHECKING:
    from ._grpc.v4.protos import ydb_value_pb2
else:
    from ._grpc.common.protos import ydb_value_pb2


_SECONDS_IN_DAY = 60 * 60 * 24
_EPOCH = datetime(1970, 1, 1)
_EPOCH_UTC = datetime(1970, 1, 1, tzinfo=timezone.utc)


def _from_date(x: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> typing.Union[date, int]:
    if table_client_settings is not None and table_client_settings._native_date_in_result_sets:
        return _EPOCH.date() + timedelta(days=x.uint32_value)
    return x.uint32_value


def _to_date(pb: ydb_value_pb2.Value, value: typing.Union[date, datetime, int]) -> None:
    if isinstance(value, datetime):
        pb.uint32_value = (value.date() - _EPOCH.date()).days
    elif isinstance(value, date):
        pb.uint32_value = (value - _EPOCH.date()).days
    else:
        pb.uint32_value = value


def _from_date32(x: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> typing.Union[date, int]:
    if table_client_settings is not None and table_client_settings._native_date_in_result_sets:
        return _EPOCH.date() + timedelta(days=x.int32_value)
    return x.int32_value


def _to_date32(pb: ydb_value_pb2.Value, value: typing.Union[date, int]) -> None:
    if isinstance(value, date):
        pb.int32_value = (value - _EPOCH.date()).days
    else:
        pb.int32_value = value


def _from_datetime_number(
    x: typing.Union[float, datetime], table_client_settings: table.TableClientSettings
) -> typing.Union[float, datetime]:
    if table_client_settings is not None and table_client_settings._native_datetime_in_result_sets:
        # x is float when native_datetime_in_result_sets is True
        return datetime.utcfromtimestamp(typing.cast(float, x))
    return x


def _from_json(x: typing.Union[str, bytearray, bytes], table_client_settings: table.TableClientSettings) -> typing.Any:
    if table_client_settings is not None and table_client_settings._native_json_in_result_sets:
        return json.loads(x)
    return x


def _to_uuid(value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> uuid.UUID:
    return uuid.UUID(bytes_le=struct.pack("QQ", value_pb.low_128, value_pb.high_128))


def _from_uuid(pb: ydb_value_pb2.Value, value: uuid.UUID) -> None:
    pb.low_128 = struct.unpack("Q", value.bytes_le[0:8])[0]
    pb.high_128 = struct.unpack("Q", value.bytes_le[8:16])[0]


def _timedelta_to_microseconds(value: timedelta) -> int:
    return (value.days * _SECONDS_IN_DAY + value.seconds) * 1000000 + value.microseconds


def _from_interval(
    value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings
) -> typing.Union[timedelta, int]:
    if table_client_settings is not None and table_client_settings._native_interval_in_result_sets:
        return timedelta(microseconds=value_pb.int64_value)
    return value_pb.int64_value


def _to_interval(pb: ydb_value_pb2.Value, value: typing.Union[timedelta, int]) -> None:
    if isinstance(value, timedelta):
        pb.int64_value = _timedelta_to_microseconds(value)
    else:
        pb.int64_value = value


def _from_timestamp(
    value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings
) -> typing.Union[datetime, int]:
    if table_client_settings is not None and table_client_settings._native_timestamp_in_result_sets:
        return _EPOCH + timedelta(microseconds=value_pb.uint64_value)
    return value_pb.uint64_value


def _to_timestamp(pb: ydb_value_pb2.Value, value: typing.Union[datetime, int]) -> None:
    if isinstance(value, datetime):
        if value.tzinfo:
            epoch = _EPOCH_UTC
        else:
            epoch = _EPOCH
        pb.uint64_value = _timedelta_to_microseconds(value - epoch)
    else:
        pb.uint64_value = value


def _from_timestamp64(
    value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings
) -> typing.Union[datetime, int]:
    if table_client_settings is not None and table_client_settings._native_timestamp_in_result_sets:
        return _EPOCH + timedelta(microseconds=value_pb.int64_value)
    return value_pb.int64_value


def _to_timestamp64(pb: ydb_value_pb2.Value, value: typing.Union[datetime, int]) -> None:
    if isinstance(value, datetime):
        if value.tzinfo:
            epoch = _EPOCH_UTC
        else:
            epoch = _EPOCH
        pb.int64_value = _timedelta_to_microseconds(value - epoch)
    else:
        pb.int64_value = value


[docs] @enum.unique class PrimitiveType(enum.Enum): """ Enumerates all available primitive types that can be used in computations. """ Int32 = _apis.primitive_types.INT32, "int32_value" Uint32 = _apis.primitive_types.UINT32, "uint32_value" Int64 = _apis.primitive_types.INT64, "int64_value" Uint64 = _apis.primitive_types.UINT64, "uint64_value" Int8 = _apis.primitive_types.INT8, "int32_value" Uint8 = _apis.primitive_types.UINT8, "uint32_value" Int16 = _apis.primitive_types.INT16, "int32_value" Uint16 = _apis.primitive_types.UINT16, "uint32_value" Bool = _apis.primitive_types.BOOL, "bool_value" Double = _apis.primitive_types.DOUBLE, "double_value" Float = _apis.primitive_types.FLOAT, "float_value" String = _apis.primitive_types.STRING, "bytes_value" Utf8 = _apis.primitive_types.UTF8, "text_value" Yson = _apis.primitive_types.YSON, "bytes_value" Json = _apis.primitive_types.JSON, "text_value", _from_json JsonDocument = _apis.primitive_types.JSON_DOCUMENT, "text_value", _from_json UUID = (_apis.primitive_types.UUID, None, _to_uuid, _from_uuid) Date = ( _apis.primitive_types.DATE, None, _from_date, _to_date, ) Date32 = ( _apis.primitive_types.DATE32, None, _from_date32, _to_date32, ) Datetime = ( _apis.primitive_types.DATETIME, "uint32_value", _from_datetime_number, ) Datetime64 = ( _apis.primitive_types.DATETIME64, "int64_value", _from_datetime_number, ) Timestamp = ( _apis.primitive_types.TIMESTAMP, None, _from_timestamp, _to_timestamp, ) Timestamp64 = ( _apis.primitive_types.TIMESTAMP64, None, _from_timestamp64, _to_timestamp64, ) Interval = ( _apis.primitive_types.INTERVAL, None, _from_interval, _to_interval, ) Interval64 = ( _apis.primitive_types.INTERVAL64, None, _from_interval, _to_interval, ) DyNumber = _apis.primitive_types.DYNUMBER, "text_value" def __init__( self, idn: ydb_value_pb2.Type.PrimitiveTypeId, proto_field: typing.Optional[str], to_obj: typing.Optional[typing.Callable[..., typing.Any]] = None, from_obj: typing.Optional[typing.Callable[..., None]] = None, ) -> None: self._idn_ = idn self._to_obj = to_obj self._from_obj = from_obj self._proto_field = proto_field
[docs] def get_value(self, value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> typing.Any: """ Extracts value from protocol buffer :param value_pb: A protocol buffer :return: A valid value of primitive type """ if self._to_obj is not None and self._proto_field: return self._to_obj(getattr(value_pb, self._proto_field), table_client_settings) if self._to_obj is not None: return self._to_obj(value_pb, table_client_settings) assert self._proto_field is not None return getattr(value_pb, self._proto_field)
[docs] def set_value(self, pb: ydb_value_pb2.Value, value: typing.Any) -> None: """ Sets value in a protocol buffer :param pb: A protocol buffer :param value: A valid value to set :return: None """ if self._from_obj: self._from_obj(pb, value) else: assert self._proto_field is not None setattr(pb, self._proto_field, value)
def __str__(self) -> str: return self._name_ @property def proto(self) -> ydb_value_pb2.Type: """ Returns protocol buffer representation of a primitive type :return: A protocol buffer representation """ return _apis.ydb_value.Type(type_id=self._idn_)
[docs] class DataQuery(object): __slots__ = ("yql_text", "parameters_types", "name") def __init__( self, query_id: str, parameters_types: "dict[str, ydb_value_pb2.Type]", name: typing.Optional[str] = None ): self.yql_text = query_id self.parameters_types = parameters_types self.name = _utilities.get_query_hash(self.yql_text) if name is None else name
####################### # A deprecated alias # ####################### DataType = PrimitiveType class AbstractTypeBuilder(abc.ABC): @property @abc.abstractmethod def proto(self) -> ydb_value_pb2.Type: """ Returns protocol buffer representation of a type :return: A protocol buffer representation """ pass
[docs] class DecimalType(AbstractTypeBuilder): __slots__ = ("_proto", "_precision", "_scale") def __init__(self, precision: int = 22, scale: int = 9) -> None: """ Represents a decimal type :param precision: A precision value :param scale: A scale value """ self._precision = precision self._scale = scale self._proto = _apis.ydb_value.Type() self._proto.decimal_type.MergeFrom(_apis.ydb_value.DecimalType(precision=self._precision, scale=self._scale)) @property def precision(self) -> int: return self._precision @property def scale(self) -> int: return self._scale @property def proto(self) -> ydb_value_pb2.Type: """ Returns protocol buffer representation of a type :return: A protocol buffer representation """ return self._proto def __eq__(self, other: object) -> bool: if not isinstance(other, DecimalType): return NotImplemented return self._precision == other.precision and self._scale == other.scale def __str__(self) -> str: """ Returns string representation of a type :return: A string representation """ return "Decimal(%d,%d)" % (self._precision, self._scale)
class NullType(AbstractTypeBuilder): __slots__ = ("_repr", "_proto") def __init__(self) -> None: self._proto = _apis.ydb_value.Type(null_type=struct_pb2.NULL_VALUE) # type: ignore[arg-type] @property def proto(self) -> ydb_value_pb2.Type: return self._proto def __str__(self) -> str: return "NullType"
[docs] class OptionalType(AbstractTypeBuilder): __slots__ = ("_repr", "_proto", "_item") def __init__(self, optional_type: typing.Union[AbstractTypeBuilder, PrimitiveType]) -> None: """ Represents optional type that wraps inner type :param optional_type: An instance of an inner type """ self._repr = "%s?" % str(optional_type) self._proto = _apis.ydb_value.Type() self._item = optional_type self._proto.optional_type.MergeFrom(_apis.ydb_value.OptionalType(item=optional_type.proto)) @property def item(self) -> typing.Union[AbstractTypeBuilder, PrimitiveType]: return self._item @property def proto(self) -> ydb_value_pb2.Type: """ Returns protocol buffer representation of a type :return: A protocol buffer representation """ return self._proto def __eq__(self, other: object) -> bool: if not isinstance(other, OptionalType): return NotImplemented return self._item == other.item def __str__(self) -> str: return self._repr
[docs] class ListType(AbstractTypeBuilder): __slots__ = ("_repr", "_proto") def __init__(self, list_type: typing.Union[AbstractTypeBuilder, PrimitiveType]) -> None: """ :param list_type: List item type builder """ self._repr = "List<%s>" % str(list_type) self._proto = _apis.ydb_value.Type(list_type=_apis.ydb_value.ListType(item=list_type.proto)) @property def proto(self) -> ydb_value_pb2.Type: """ Returns protocol buffer representation of type :return: A protocol buffer representation """ return self._proto def __str__(self) -> str: return self._repr
[docs] class DictType(AbstractTypeBuilder): __slots__ = ("__repr", "__proto") def __init__( self, key_type: typing.Union[AbstractTypeBuilder, PrimitiveType], payload_type: typing.Union[AbstractTypeBuilder, PrimitiveType], ) -> None: """ :param key_type: Key type builder :param payload_type: Payload type builder """ self._repr = "Dict<%s,%s>" % (str(key_type), str(payload_type)) self._proto = _apis.ydb_value.Type( dict_type=_apis.ydb_value.DictType( key=key_type.proto, payload=payload_type.proto, ) ) @property def proto(self) -> ydb_value_pb2.Type: return self._proto def __str__(self) -> str: return self._repr
class SetType(AbstractTypeBuilder): __slots__ = ("__repr", "__proto") def __init__( self, key_type: typing.Union[AbstractTypeBuilder, PrimitiveType], ) -> None: """ :param key_type: Key type builder """ self._repr = "Set<%s>" % (str(key_type)) self._proto = _apis.ydb_value.Type( dict_type=_apis.ydb_value.DictType( key=key_type.proto, payload=_apis.ydb_value.Type(void_type=struct_pb2.NULL_VALUE), # type: ignore[arg-type] ) ) @property def proto(self) -> ydb_value_pb2.Type: return self._proto def __str__(self) -> str: return self._repr
[docs] class TupleType(AbstractTypeBuilder): __slots__ = ("__elements_repr", "__proto") def __init__(self) -> None: self.__elements_repr: typing.List[str] = [] self.__proto = _apis.ydb_value.Type(tuple_type=_apis.ydb_value.TupleType())
[docs] def add_element(self, element_type: typing.Union[AbstractTypeBuilder, PrimitiveType]) -> "TupleType": """ :param element_type: Adds additional element of tuple :return: self """ self.__elements_repr.append(str(element_type)) element = self.__proto.tuple_type.elements.add() element.MergeFrom(element_type.proto) return self
@property def proto(self) -> ydb_value_pb2.Type: return self.__proto def __str__(self) -> str: return "Tuple<%s>" % ",".join(self.__elements_repr)
[docs] class StructType(AbstractTypeBuilder): __slots__ = ("__members_repr", "__proto") def __init__(self) -> None: self.__members_repr: typing.List[str] = [] self.__proto = _apis.ydb_value.Type(struct_type=_apis.ydb_value.StructType())
[docs] def add_member(self, name: str, member_type: typing.Union[AbstractTypeBuilder, PrimitiveType]) -> "StructType": """ :param name: :param member_type: :return: """ self.__members_repr.append("%s:%s" % (name, str(member_type))) member = self.__proto.struct_type.members.add() member.name = name member.type.MergeFrom(member_type.proto) return self
@property def proto(self) -> ydb_value_pb2.Type: return self.__proto def __str__(self) -> str: return "Struct<%s>" % ",".join(self.__members_repr)
[docs] class BulkUpsertColumns(AbstractTypeBuilder): __slots__ = ("__columns_repr", "__proto") def __init__(self) -> None: self.__columns_repr: typing.List[str] = [] self.__proto = _apis.ydb_value.Type(struct_type=_apis.ydb_value.StructType())
[docs] def add_column( self, name: str, column_type: typing.Union[AbstractTypeBuilder, PrimitiveType] ) -> "BulkUpsertColumns": """ :param name: A column name :param column_type: A column type """ self.__columns_repr.append("%s:%s" % (name, column_type)) column = self.__proto.struct_type.members.add() column.name = name column.type.MergeFrom(column_type.proto) return self
@property def proto(self) -> ydb_value_pb2.Type: return self.__proto def __str__(self) -> str: return "BulkUpsertColumns<%s>" % ",".join(self.__columns_repr)
[docs] @dataclass class TypedValue: value: typing.Any value_type: typing.Optional[typing.Union[PrimitiveType, AbstractTypeBuilder]] = None