# -*- coding: utf-8 -*-
from __future__ import annotations
import abc
from dataclasses import dataclass
import enum
import json
from . import _utilities, _apis
from datetime import date, datetime, timedelta, timezone
import typing
import uuid
import struct
from google.protobuf import struct_pb2
from . import table
# Workaround for good IDE and universal for runtime
if typing.TYPE_CHECKING:
from ._grpc.v4.protos import ydb_value_pb2
else:
from ._grpc.common.protos import ydb_value_pb2
_SECONDS_IN_DAY = 60 * 60 * 24
_EPOCH = datetime(1970, 1, 1)
_EPOCH_UTC = datetime(1970, 1, 1, tzinfo=timezone.utc)
def _from_date(x: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> typing.Union[date, int]:
if table_client_settings is not None and table_client_settings._native_date_in_result_sets:
return _EPOCH.date() + timedelta(days=x.uint32_value)
return x.uint32_value
def _to_date(pb: ydb_value_pb2.Value, value: typing.Union[date, int]) -> None:
if isinstance(value, date):
pb.uint32_value = (value - _EPOCH.date()).days
else:
pb.uint32_value = value
def _from_datetime_number(
x: typing.Union[float, datetime], table_client_settings: table.TableClientSettings
) -> datetime:
if table_client_settings is not None and table_client_settings._native_datetime_in_result_sets:
return datetime.utcfromtimestamp(x)
return x
def _from_json(x: typing.Union[str, bytearray, bytes], table_client_settings: table.TableClientSettings):
if table_client_settings is not None and table_client_settings._native_json_in_result_sets:
return json.loads(x)
return x
def _to_uuid(value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings) -> uuid.UUID:
return uuid.UUID(bytes_le=struct.pack("QQ", value_pb.low_128, value_pb.high_128))
def _from_uuid(pb: ydb_value_pb2.Value, value: uuid.UUID):
pb.low_128 = struct.unpack("Q", value.bytes_le[0:8])[0]
pb.high_128 = struct.unpack("Q", value.bytes_le[8:16])[0]
def _from_interval(
value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings
) -> typing.Union[timedelta, int]:
if table_client_settings is not None and table_client_settings._native_interval_in_result_sets:
return timedelta(microseconds=value_pb.int64_value)
return value_pb.int64_value
def _timedelta_to_microseconds(value: timedelta) -> int:
return (value.days * _SECONDS_IN_DAY + value.seconds) * 1000000 + value.microseconds
def _to_interval(pb: ydb_value_pb2.Value, value: typing.Union[timedelta, int]):
if isinstance(value, timedelta):
pb.int64_value = _timedelta_to_microseconds(value)
else:
pb.int64_value = value
def _from_timestamp(
value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings
) -> typing.Union[datetime, int]:
if table_client_settings is not None and table_client_settings._native_timestamp_in_result_sets:
return _EPOCH + timedelta(microseconds=value_pb.uint64_value)
return value_pb.uint64_value
def _to_timestamp(pb: ydb_value_pb2.Value, value: typing.Union[datetime, int]):
if isinstance(value, datetime):
if value.tzinfo:
epoch = _EPOCH_UTC
else:
epoch = _EPOCH
pb.uint64_value = _timedelta_to_microseconds(value - epoch)
else:
pb.uint64_value = value
@enum.unique
class PrimitiveType(enum.Enum):
"""
Enumerates all available primitive types that can be used
in computations.
"""
Int32 = _apis.primitive_types.INT32, "int32_value"
Uint32 = _apis.primitive_types.UINT32, "uint32_value"
Int64 = _apis.primitive_types.INT64, "int64_value"
Uint64 = _apis.primitive_types.UINT64, "uint64_value"
Int8 = _apis.primitive_types.INT8, "int32_value"
Uint8 = _apis.primitive_types.UINT8, "uint32_value"
Int16 = _apis.primitive_types.INT16, "int32_value"
Uint16 = _apis.primitive_types.UINT16, "uint32_value"
Bool = _apis.primitive_types.BOOL, "bool_value"
Double = _apis.primitive_types.DOUBLE, "double_value"
Float = _apis.primitive_types.FLOAT, "float_value"
String = _apis.primitive_types.STRING, "bytes_value"
Utf8 = _apis.primitive_types.UTF8, "text_value"
Yson = _apis.primitive_types.YSON, "bytes_value"
Json = _apis.primitive_types.JSON, "text_value", _from_json
JsonDocument = _apis.primitive_types.JSON_DOCUMENT, "text_value", _from_json
UUID = (_apis.primitive_types.UUID, None, _to_uuid, _from_uuid)
Date = (
_apis.primitive_types.DATE,
None,
_from_date,
_to_date,
)
Datetime = (
_apis.primitive_types.DATETIME,
"uint32_value",
_from_datetime_number,
)
Timestamp = (
_apis.primitive_types.TIMESTAMP,
None,
_from_timestamp,
_to_timestamp,
)
Interval = (
_apis.primitive_types.INTERVAL,
None,
_from_interval,
_to_interval,
)
DyNumber = _apis.primitive_types.DYNUMBER, "text_value"
def __init__(
self, idn: ydb_value_pb2.Type.PrimitiveTypeId, proto_field: typing.Optional[str], to_obj=None, from_obj=None
):
self._idn_ = idn
self._to_obj = to_obj
self._from_obj = from_obj
self._proto_field = proto_field
def get_value(self, value_pb: ydb_value_pb2.Value, table_client_settings: table.TableClientSettings):
"""
Extracts value from protocol buffer
:param value_pb: A protocol buffer
:return: A valid value of primitive type
"""
if self._to_obj is not None and self._proto_field:
return self._to_obj(getattr(value_pb, self._proto_field), table_client_settings)
if self._to_obj is not None:
return self._to_obj(value_pb, table_client_settings)
return getattr(value_pb, self._proto_field)
def set_value(self, pb: ydb_value_pb2.Value, value):
"""
Sets value in a protocol buffer
:param pb: A protocol buffer
:param value: A valid value to set
:return: None
"""
if self._from_obj:
self._from_obj(pb, value)
else:
setattr(pb, self._proto_field, value)
def __str__(self):
return self._name_
@property
def proto(self):
"""
Returns protocol buffer representation of a primitive type
:return: A protocol buffer representation
"""
return _apis.ydb_value.Type(type_id=self._idn_)
[docs]
class DataQuery(object):
__slots__ = ("yql_text", "parameters_types", "name")
def __init__(
self, query_id: str, parameters_types: "dict[str, ydb_value_pb2.Type]", name: typing.Optional[str] = None
):
self.yql_text = query_id
self.parameters_types = parameters_types
self.name = _utilities.get_query_hash(self.yql_text) if name is None else name
#######################
# A deprecated alias #
#######################
DataType = PrimitiveType
class AbstractTypeBuilder(object):
__metaclass__ = abc.ABCMeta
@property
@abc.abstractmethod
def proto(self):
"""
Returns protocol buffer representation of a type
:return: A protocol buffer representation
"""
pass
class DecimalType(AbstractTypeBuilder):
__slots__ = ("_proto", "_precision", "_scale")
def __init__(self, precision=22, scale=9):
"""
Represents a decimal type
:param precision: A precision value
:param scale: A scale value
"""
self._precision = precision
self._scale = scale
self._proto = _apis.ydb_value.Type()
self._proto.decimal_type.MergeFrom(_apis.ydb_value.DecimalType(precision=self._precision, scale=self._scale))
@property
def precision(self):
return self._precision
@property
def scale(self):
return self._scale
@property
def proto(self):
"""
Returns protocol buffer representation of a type
:return: A protocol buffer representation
"""
return self._proto
def __eq__(self, other):
return self._precision == other.precision and self._scale == other.scale
def __str__(self):
"""
Returns string representation of a type
:return: A string representation
"""
return "Decimal(%d,%d)" % (self._precision, self._scale)
class NullType(AbstractTypeBuilder):
__slots__ = ("_repr", "_proto")
def __init__(self):
self._proto = _apis.ydb_value.Type(null_type=struct_pb2.NULL_VALUE)
@property
def proto(self):
return self._proto
def __str__(self):
return "NullType"
class OptionalType(AbstractTypeBuilder):
__slots__ = ("_repr", "_proto", "_item")
def __init__(self, optional_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
Represents optional type that wraps inner type
:param optional_type: An instance of an inner type
"""
self._repr = "%s?" % str(optional_type)
self._proto = _apis.ydb_value.Type()
self._item = optional_type
self._proto.optional_type.MergeFrom(_apis.ydb_value.OptionalType(item=optional_type.proto))
@property
def item(self):
return self._item
@property
def proto(self):
"""
Returns protocol buffer representation of a type
:return: A protocol buffer representation
"""
return self._proto
def __eq__(self, other):
return self._item == other.item
def __str__(self):
return self._repr
class ListType(AbstractTypeBuilder):
__slots__ = ("_repr", "_proto")
def __init__(self, list_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
:param list_type: List item type builder
"""
self._repr = "List<%s>" % str(list_type)
self._proto = _apis.ydb_value.Type(list_type=_apis.ydb_value.ListType(item=list_type.proto))
@property
def proto(self):
"""
Returns protocol buffer representation of type
:return: A protocol buffer representation
"""
return self._proto
def __str__(self):
return self._repr
class DictType(AbstractTypeBuilder):
__slots__ = ("__repr", "__proto")
def __init__(
self,
key_type: typing.Union[AbstractTypeBuilder, PrimitiveType],
payload_type: typing.Union[AbstractTypeBuilder, PrimitiveType],
):
"""
:param key_type: Key type builder
:param payload_type: Payload type builder
"""
self._repr = "Dict<%s,%s>" % (str(key_type), str(payload_type))
self._proto = _apis.ydb_value.Type(
dict_type=_apis.ydb_value.DictType(
key=key_type.proto,
payload=payload_type.proto,
)
)
@property
def proto(self):
return self._proto
def __str__(self):
return self._repr
class TupleType(AbstractTypeBuilder):
__slots__ = ("__elements_repr", "__proto")
def __init__(self):
self.__elements_repr = []
self.__proto = _apis.ydb_value.Type(tuple_type=_apis.ydb_value.TupleType())
def add_element(self, element_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
:param element_type: Adds additional element of tuple
:return: self
"""
self.__elements_repr.append(str(element_type))
element = self.__proto.tuple_type.elements.add()
element.MergeFrom(element_type.proto)
return self
@property
def proto(self):
return self.__proto
def __str__(self):
return "Tuple<%s>" % ",".join(self.__elements_repr)
class StructType(AbstractTypeBuilder):
__slots__ = ("__members_repr", "__proto")
def __init__(self):
self.__members_repr = []
self.__proto = _apis.ydb_value.Type(struct_type=_apis.ydb_value.StructType())
def add_member(self, name: str, member_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
:param name:
:param member_type:
:return:
"""
self.__members_repr.append("%s:%s" % (name, str(member_type)))
member = self.__proto.struct_type.members.add()
member.name = name
member.type.MergeFrom(member_type.proto)
return self
@property
def proto(self):
return self.__proto
def __str__(self):
return "Struct<%s>" % ",".join(self.__members_repr)
class BulkUpsertColumns(AbstractTypeBuilder):
__slots__ = ("__columns_repr", "__proto")
def __init__(self):
self.__columns_repr = []
self.__proto = _apis.ydb_value.Type(struct_type=_apis.ydb_value.StructType())
def add_column(self, name: str, column_type: typing.Union[AbstractTypeBuilder, PrimitiveType]):
"""
:param name: A column name
:param column_type: A column type
"""
self.__columns_repr.append("%s:%s" % (name, column_type))
column = self.__proto.struct_type.members.add()
column.name = name
column.type.MergeFrom(column_type.proto)
return self
@property
def proto(self):
return self.__proto
def __str__(self):
return "BulkUpsertColumns<%s>" % ",".join(self.__columns_repr)
@dataclass
class TypedValue:
value: typing.Any
value_type: typing.Optional[typing.Union[PrimitiveType, AbstractTypeBuilder]] = None