# -*- coding: utf-8 -*-
import abc
import ydb
from abc import abstractmethod
import logging
import enum
import typing
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from . import (
issues,
convert,
settings as settings_impl,
scheme,
types,
_utilities,
_apis,
_sp_impl,
_session_impl,
_tx_ctx_impl,
tracing,
)
from .retries import (
YdbRetryOperationFinalResult, # noqa
YdbRetryOperationSleepOpt, # noqa
BackoffSettings, # noqa
retry_operation_impl, # noqa
RetrySettings,
retry_operation_sync,
)
try:
from . import interceptor
except ImportError:
interceptor = None
_default_allow_split_transaction = False
logger = logging.getLogger(__name__)
##################################################################
# A deprecated aliases in case when direct import has been used #
##################################################################
SessionPoolEmpty = issues.SessionPoolEmpty
DataQuery = types.DataQuery
class DescribeTableSettings(settings_impl.BaseRequestSettings):
def __init__(self):
super(DescribeTableSettings, self).__init__()
self.include_shard_key_bounds = False
self.include_table_stats = False
def with_include_shard_key_bounds(self, value):
self.include_shard_key_bounds = value
return self
def with_include_table_stats(self, value):
self.include_table_stats = value
return self
class ExecDataQuerySettings(settings_impl.BaseRequestSettings):
def __init__(self):
super(ExecDataQuerySettings, self).__init__()
self.keep_in_cache = True
def with_keep_in_cache(self, value):
self.keep_in_cache = value
return self
class KeyBound(object):
__slots__ = ("_equal", "value", "type")
def __init__(self, key_value, key_type=None, inclusive=False):
"""
Represents key bound.
:param key_value: An iterable with key values
:param key_type: A type of key
:param inclusive: A flag that indicates bound includes key provided in the value.
"""
try:
iter(key_value)
except TypeError:
assert False, "value must be iterable!"
if isinstance(key_type, types.TupleType):
key_type = key_type.proto
self._equal = inclusive
self.value = key_value
self.type = key_type
def is_inclusive(self):
return self._equal
def is_exclusive(self):
return not self._equal
def __str__(self):
if self._equal:
return "InclusiveKeyBound(Tuple%s)" % str(self.value)
return "ExclusiveKeyBound(Tuple%s)" % str(self.value)
@classmethod
def inclusive(cls, key_value, key_type):
return cls(key_value, key_type, True)
@classmethod
def exclusive(cls, key_value, key_type):
return cls(key_value, key_type, False)
class KeyRange(object):
__slots__ = ("from_bound", "to_bound")
def __init__(self, from_bound, to_bound):
self.from_bound = from_bound
self.to_bound = to_bound
def __repr__(self):
return self.__str__()
def __str__(self):
return "KeyRange(%s, %s)" % (str(self.from_bound), str(self.to_bound))
class Column(object):
def __init__(self, name, type, family=None):
self._name = name
self._type = type
self.family = family
def __eq__(self, other):
return self.name == other.name and self._type.item == other.type.item
@property
def name(self):
return self._name
@property
def type(self):
return self._type
def with_family(self, family):
self.family = family
return self
@property
def type_pb(self):
try:
return self._type.proto
except Exception:
return self._type
@enum.unique
class FeatureFlag(enum.IntEnum):
UNSPECIFIED = 0
ENABLED = 1
DISABLED = 2
@enum.unique
class AutoPartitioningPolicy(enum.IntEnum):
AUTO_PARTITIONING_POLICY_UNSPECIFIED = 0
DISABLED = 1
AUTO_SPLIT = 2
AUTO_SPLIT_MERGE = 3
@enum.unique
class IndexStatus(enum.IntEnum):
INDEX_STATUS_UNSPECIFIED = 0
READY = 1
BUILDING = 2
class CachingPolicy(object):
def __init__(self):
self._pb = _apis.ydb_table.CachingPolicy()
self.preset_name = None
def with_preset_name(self, preset_name):
self._pb.preset_name = preset_name
self.preset_name = preset_name
return self
def to_pb(self):
return self._pb
class ExecutionPolicy(object):
def __init__(self):
self._pb = _apis.ydb_table.ExecutionPolicy()
self.preset_name = None
def with_preset_name(self, preset_name):
self._pb.preset_name = preset_name
self.preset_name = preset_name
return self
def to_pb(self):
return self._pb
class CompactionPolicy(object):
def __init__(self):
self._pb = _apis.ydb_table.CompactionPolicy()
self.preset_name = None
def with_preset_name(self, preset_name):
self._pb.preset_name = preset_name
self.preset_name = preset_name
return self
def to_pb(self):
return self._pb
class SplitPoint(object):
def __init__(self, *args):
self._value = tuple(args)
@property
def value(self):
return self._value
class ExplicitPartitions(object):
def __init__(self, split_points):
self.split_points = split_points
class PartitioningPolicy(object):
def __init__(self):
self._pb = _apis.ydb_table.PartitioningPolicy()
self.preset_name = None
self.uniform_partitions = None
self.auto_partitioning = None
self.explicit_partitions = None
def with_preset_name(self, preset_name):
self._pb.preset_name = preset_name
self.preset_name = preset_name
return self
def with_uniform_partitions(self, uniform_partitions):
self._pb.uniform_partitions = uniform_partitions
self.uniform_partitions = uniform_partitions
return self
def with_explicit_partitions(self, explicit_partitions):
self.explicit_partitions = explicit_partitions
return self
def with_auto_partitioning(self, auto_partitioning):
self._pb.auto_partitioning = auto_partitioning
self.auto_partitioning = auto_partitioning
return self
def to_pb(self, table_description):
if self.explicit_partitions is not None:
column_types = {}
pk = set(table_description.primary_key)
for column in table_description.columns:
if column.name in pk:
column_types[column.name] = column.type
for split_point in self.explicit_partitions.split_points:
typed_value = self._pb.explicit_partitions.split_points.add()
split_point_type = types.TupleType()
prefix_size = len(split_point.value)
for pl_el_id, pk_name in enumerate(table_description.primary_key):
if pl_el_id >= prefix_size:
break
split_point_type.add_element(column_types[pk_name])
typed_value.type.MergeFrom(split_point_type.proto)
typed_value.value.MergeFrom(convert.from_native_value(split_point_type.proto, split_point.value))
return self._pb
class TableIndex(object):
def __init__(self, name):
self._pb = _apis.ydb_table.TableIndex()
self._pb.name = name
self.name = name
self.index_columns = []
self.data_columns = []
# output only.
self.status = None
def with_global_index(self):
self._pb.global_index.SetInParent()
return self
def with_global_async_index(self):
self._pb.global_async_index.SetInParent()
return self
def with_index_columns(self, *columns):
for column in columns:
self._pb.index_columns.append(column)
self.index_columns.append(column)
return self
def with_data_columns(self, *columns):
for column in columns:
self._pb.data_columns.append(column)
self.data_columns.append(column)
return self
def to_pb(self):
return self._pb
class ReplicationPolicy(object):
def __init__(self):
self._pb = _apis.ydb_table.ReplicationPolicy()
self.preset_name = None
self.replicas_count = None
self.allow_promotion = None
self.create_per_availability_zone = None
def with_preset_name(self, preset_name):
self._pb.preset_name = preset_name
self.preset_name = preset_name
return self
def with_replicas_count(self, replicas_count):
self._pb.replicas_count = replicas_count
self.replicas_count = replicas_count
return self
def with_create_per_availability_zone(self, create_per_availability_zone):
self._pb.create_per_availability_zone = create_per_availability_zone
self.create_per_availability_zone = create_per_availability_zone
return self
def with_allow_promotion(self, allow_promotion):
self._pb.allow_promotion = allow_promotion
self.allow_promotion = allow_promotion
return self
def to_pb(self):
return self._pb
class StoragePool(object):
def __init__(self, media):
self.media = media
def to_pb(self):
return _apis.ydb_table.StoragePool(media=self.media)
class StoragePolicy(object):
def __init__(self):
self._pb = _apis.ydb_table.StoragePolicy()
self.preset_name = None
self.syslog = None
self.log = None
self.data = None
self.keep_in_memory = None
self.external = None
def with_preset_name(self, preset_name):
self._pb.preset_name = preset_name
self.preset_name = preset_name
return self
def with_syslog_storage_settings(self, syslog_settings):
self._pb.syslog.MergeFrom(syslog_settings.to_pb())
self.syslog = syslog_settings
return self
def with_log_storage_settings(self, log_settings):
self._pb.log.MergeFrom(log_settings.to_pb())
self.log = log_settings
return self
def with_data_storage_settings(self, data_settings):
self._pb.data.MergeFrom(data_settings.to_pb())
self.data = data_settings
return self
def with_external_storage_settings(self, external_settings):
self._pb.external.MergeFrom(external_settings.to_pb())
self.external = external_settings
return self
def with_keep_in_memory(self, keep_in_memory):
self._pb.keep_in_memory = keep_in_memory
self.keep_in_memory = keep_in_memory
return self
def to_pb(self):
return self._pb
class TableProfile(object):
def __init__(self):
self.preset_name = None
self.compaction_policy = None
self.partitioning_policy = None
self.storage_policy = None
self.execution_policy = None
self.replication_policy = None
self.caching_policy = None
def with_preset_name(self, preset_name):
self.preset_name = preset_name
return self
def with_compaction_policy(self, compaction_policy):
self.compaction_policy = compaction_policy
return self
def with_partitioning_policy(self, partitioning_policy):
self.partitioning_policy = partitioning_policy
return self
def with_execution_policy(self, execution_policy):
self.execution_policy = execution_policy
return self
def with_caching_policy(self, caching_policy):
self.caching_policy = caching_policy
return self
def with_storage_policy(self, storage_policy):
self.storage_policy = storage_policy
return self
def with_replication_policy(self, replication_policy):
self.replication_policy = replication_policy
return self
def to_pb(self, table_description):
pb = _apis.ydb_table.TableProfile()
if self.preset_name is not None:
pb.preset_name = self.preset_name
if self.execution_policy is not None:
pb.execution_policy.MergeFrom(self.execution_policy.to_pb())
if self.storage_policy is not None:
pb.storage_policy.MergeFrom(self.storage_policy.to_pb())
if self.replication_policy is not None:
pb.replication_policy.MergeFrom(self.replication_policy.to_pb())
if self.caching_policy is not None:
pb.caching_policy.MergeFrom(self.caching_policy.to_pb())
if self.compaction_policy is not None:
pb.compaction_policy.MergeFrom(self.compaction_policy.to_pb())
if self.partitioning_policy is not None:
pb.partitioning_policy.MergeFrom(self.partitioning_policy.to_pb(table_description))
return pb
class DateTypeColumnModeSettings(object):
def __init__(self, column_name, expire_after_seconds=0):
self.column_name = column_name
self.expire_after_seconds = expire_after_seconds
def to_pb(self):
pb = _apis.ydb_table.DateTypeColumnModeSettings()
pb.column_name = self.column_name
pb.expire_after_seconds = self.expire_after_seconds
return pb
@enum.unique
class ColumnUnit(enum.IntEnum):
UNIT_UNSPECIFIED = 0
UNIT_SECONDS = 1
UNIT_MILLISECONDS = 2
UNIT_MICROSECONDS = 3
UNIT_NANOSECONDS = 4
class ValueSinceUnixEpochModeSettings(object):
def __init__(self, column_name, column_unit, expire_after_seconds=0):
self.column_name = column_name
self.column_unit = column_unit
self.expire_after_seconds = expire_after_seconds
def to_pb(self):
pb = _apis.ydb_table.ValueSinceUnixEpochModeSettings()
pb.column_name = self.column_name
pb.column_unit = self.column_unit
pb.expire_after_seconds = self.expire_after_seconds
return pb
class TtlSettings(object):
def __init__(self):
self.date_type_column = None
self.value_since_unix_epoch = None
def with_date_type_column(self, column_name, expire_after_seconds=0):
self.date_type_column = DateTypeColumnModeSettings(column_name, expire_after_seconds)
return self
def with_value_since_unix_epoch(self, column_name, column_unit, expire_after_seconds=0):
self.value_since_unix_epoch = ValueSinceUnixEpochModeSettings(column_name, column_unit, expire_after_seconds)
return self
def to_pb(self):
pb = _apis.ydb_table.TtlSettings()
if self.date_type_column is not None:
pb.date_type_column.MergeFrom(self.date_type_column.to_pb())
elif self.value_since_unix_epoch is not None:
pb.value_since_unix_epoch.MergeFrom(self.value_since_unix_epoch.to_pb())
else:
raise RuntimeError("Unspecified ttl settings mode")
return pb
class TableStats(object):
def __init__(self):
self.partitions = None
self.store_size = 0
def with_store_size(self, store_size):
self.store_size = store_size
return self
def with_partitions(self, partitions):
self.partitions = partitions
return self
class ReadReplicasSettings(object):
def __init__(self):
self.per_az_read_replicas_count = 0
self.any_az_read_replicas_count = 0
def with_any_az_read_replicas_count(self, any_az_read_replicas_count):
self.any_az_read_replicas_count = any_az_read_replicas_count
return self
def with_per_az_read_replicas_count(self, per_az_read_replicas_count):
self.per_az_read_replicas_count = per_az_read_replicas_count
return self
def to_pb(self):
pb = _apis.ydb_table.ReadReplicasSettings()
if self.per_az_read_replicas_count > 0:
pb.per_az_read_replicas_count = self.per_az_read_replicas_count
elif self.any_az_read_replicas_count > 0:
pb.any_az_read_replicas_count = self.any_az_read_replicas_count
return pb
class PartitioningSettings(object):
def __init__(self):
self.partitioning_by_size = 0
self.partition_size_mb = 0
self.partitioning_by_load = 0
self.min_partitions_count = 0
self.max_partitions_count = 0
def with_max_partitions_count(self, max_partitions_count):
self.max_partitions_count = max_partitions_count
return self
def with_min_partitions_count(self, min_partitions_count):
self.min_partitions_count = min_partitions_count
return self
def with_partitioning_by_load(self, partitioning_by_load):
self.partitioning_by_load = partitioning_by_load
return self
def with_partition_size_mb(self, partition_size_mb):
self.partition_size_mb = partition_size_mb
return self
def with_partitioning_by_size(self, partitioning_by_size):
self.partitioning_by_size = partitioning_by_size
return self
def to_pb(self):
pb = _apis.ydb_table.PartitioningSettings()
pb.partitioning_by_size = self.partitioning_by_size
pb.partition_size_mb = self.partition_size_mb
pb.partitioning_by_load = self.partitioning_by_load
pb.min_partitions_count = self.min_partitions_count
pb.max_partitions_count = self.max_partitions_count
return pb
class StorageSettings(object):
def __init__(self):
self.tablet_commit_log0 = None
self.tablet_commit_log1 = None
self.external = None
self.store_external_blobs = 0
def with_store_external_blobs(self, store_external_blobs):
self.store_external_blobs = store_external_blobs
return self
def with_external(self, external):
self.external = external
return self
def with_tablet_commit_log1(self, tablet_commit_log1):
self.tablet_commit_log1 = tablet_commit_log1
return self
def with_tablet_commit_log0(self, tablet_commit_log0):
self.tablet_commit_log0 = tablet_commit_log0
return self
def to_pb(self):
st = _apis.ydb_table.StorageSettings()
st.store_external_blobs = self.store_external_blobs
if self.external:
st.external.MergeFrom(self.external.to_pb())
if self.tablet_commit_log0:
st.tablet_commit_log0.MergeFrom(self.tablet_commit_log0.to_pb())
if self.tablet_commit_log1:
st.tablet_commit_log1.MergeFrom(self.tablet_commit_log1.to_pb())
return st
@enum.unique
class Compression(enum.IntEnum):
UNSPECIFIED = 0
NONE = 1
LZ4 = 2
class ColumnFamily(object):
def __init__(self):
self.compression = 0
self.name = None
self.data = None
self.keep_in_memory = 0
def with_name(self, name):
self.name = name
return self
def with_compression(self, compression):
self.compression = compression
return self
def with_data(self, data):
self.data = data
return self
def with_keep_in_memory(self, keep_in_memory):
self.keep_in_memory = keep_in_memory
return self
def to_pb(self):
cm = _apis.ydb_table.ColumnFamily()
cm.keep_in_memory = self.keep_in_memory
cm.compression = self.compression
if self.name is not None:
cm.name = self.name
if self.data is not None:
cm.data.MergeFrom(self.data.to_pb())
return cm
class TableDescription(object):
def __init__(self):
self.columns = []
self.primary_key = []
self.profile = None
self.indexes = []
self.column_families = []
self.ttl_settings = None
self.attributes = {}
self.uniform_partitions = 0
self.partition_at_keys = None
self.compaction_policy = None
self.key_bloom_filter = 0
self.read_replicas_settings = None
self.partitioning_settings = None
self.storage_settings = None
def with_storage_settings(self, storage_settings):
self.storage_settings = storage_settings
return self
def with_column(self, column):
self.columns.append(column)
return self
def with_columns(self, *columns):
for column in columns:
self.with_column(column)
return self
def with_primary_key(self, key):
self.primary_key.append(key)
return self
def with_primary_keys(self, *keys):
for pk in keys:
self.with_primary_key(pk)
return self
def with_column_family(self, column_family):
self.column_families.append(column_family)
return self
def with_column_families(self, *column_families):
for column_family in column_families:
self.with_column_family(column_family)
return self
def with_indexes(self, *indexes):
for index in indexes:
self.with_index(index)
return self
def with_index(self, index):
self.indexes.append(index)
return self
def with_profile(self, profile):
self.profile = profile
return self
def with_ttl(self, ttl_settings):
self.ttl_settings = ttl_settings
return self
def with_attributes(self, attributes):
self.attributes = attributes
return self
def with_uniform_partitions(self, uniform_partitions):
self.uniform_partitions = uniform_partitions
return self
def with_partition_at_keys(self, partition_at_keys):
self.partition_at_keys = partition_at_keys
return self
def with_key_bloom_filter(self, key_bloom_filter):
self.key_bloom_filter = key_bloom_filter
return self
def with_partitioning_settings(self, partitioning_settings):
self.partitioning_settings = partitioning_settings
return self
def with_read_replicas_settings(self, read_replicas_settings):
self.read_replicas_settings = read_replicas_settings
return self
def with_compaction_policy(self, compaction_policy):
self.compaction_policy = compaction_policy
return self
class AbstractTransactionModeBuilder(abc.ABC):
@property
@abc.abstractmethod
def name(self):
pass
@property
@abc.abstractmethod
def settings(self):
pass
class SnapshotReadOnly(AbstractTransactionModeBuilder):
__slots__ = ("_pb", "_name")
def __init__(self):
self._pb = _apis.ydb_table.SnapshotModeSettings()
self._name = "snapshot_read_only"
@property
def settings(self):
return self._pb
@property
def name(self):
return self._name
class SerializableReadWrite(AbstractTransactionModeBuilder):
__slots__ = ("_pb", "_name")
def __init__(self):
self._name = "serializable_read_write"
self._pb = _apis.ydb_table.SerializableModeSettings()
@property
def settings(self):
return self._pb
@property
def name(self):
return self._name
class OnlineReadOnly(AbstractTransactionModeBuilder):
__slots__ = ("_pb", "_name")
def __init__(self):
self._pb = _apis.ydb_table.OnlineModeSettings()
self._pb.allow_inconsistent_reads = False
self._name = "online_read_only"
def with_allow_inconsistent_reads(self):
self._pb.allow_inconsistent_reads = True
return self
@property
def settings(self):
return self._pb
@property
def name(self):
return self._name
class StaleReadOnly(AbstractTransactionModeBuilder):
__slots__ = ("_pb", "_name")
def __init__(self):
self._pb = _apis.ydb_table.StaleModeSettings()
self._name = "stale_read_only"
@property
def settings(self):
return self._pb
@property
def name(self):
return self._name
[docs]
class TableClientSettings(object):
def __init__(self):
self._client_query_cache_enabled = False
self._native_datetime_in_result_sets = False
self._native_date_in_result_sets = False
self._make_result_sets_lazy = False
self._native_json_in_result_sets = False
self._native_interval_in_result_sets = False
self._native_timestamp_in_result_sets = False
self._allow_truncated_result = convert._default_allow_truncated_result
[docs]
def with_native_timestamp_in_result_sets(self, enabled):
# type:(bool) -> ydb.TableClientSettings
self._native_timestamp_in_result_sets = enabled
return self
[docs]
def with_native_interval_in_result_sets(self, enabled):
# type:(bool) -> ydb.TableClientSettings
self._native_interval_in_result_sets = enabled
return self
[docs]
def with_native_json_in_result_sets(self, enabled):
# type:(bool) -> ydb.TableClientSettings
self._native_json_in_result_sets = enabled
return self
[docs]
def with_native_date_in_result_sets(self, enabled):
# type:(bool) -> ydb.TableClientSettings
self._native_date_in_result_sets = enabled
return self
[docs]
def with_native_datetime_in_result_sets(self, enabled):
# type:(bool) -> ydb.TableClientSettings
self._native_datetime_in_result_sets = enabled
return self
[docs]
def with_client_query_cache(self, enabled):
# type:(bool) -> ydb.TableClientSettings
self._client_query_cache_enabled = enabled
return self
[docs]
def with_lazy_result_sets(self, enabled):
# type:(bool) -> ydb.TableClientSettings
self._make_result_sets_lazy = enabled
return self
[docs]
def with_allow_truncated_result(self, enabled):
# type:(bool) -> ydb.TableClientSettings
self._allow_truncated_result = enabled
return self
class ScanQueryResult(object):
def __init__(self, result, table_client_settings):
self._result = result
self.query_stats = result.query_stats
self.result_set = convert.ResultSet.from_message(self._result.result_set, table_client_settings)
@enum.unique
class QueryStatsCollectionMode(enum.IntEnum):
NONE = _apis.ydb_table.QueryStatsCollection.Mode.STATS_COLLECTION_NONE
BASIC = _apis.ydb_table.QueryStatsCollection.Mode.STATS_COLLECTION_BASIC
FULL = _apis.ydb_table.QueryStatsCollection.Mode.STATS_COLLECTION_FULL
class ScanQuerySettings(settings_impl.BaseRequestSettings):
def __init__(self):
super(ScanQuerySettings, self).__init__()
self.collect_stats = None
def with_collect_stats(self, collect_stats_mode):
self.collect_stats = collect_stats_mode
return self
class ScanQuery(object):
def __init__(self, yql_text, parameters_types):
self.yql_text = yql_text
self.parameters_types = parameters_types
def _wrap_scan_query_response(response, table_client_settings):
issues._process_response(response)
return ScanQueryResult(response.result, table_client_settings)
def _scan_query_request_factory(query, parameters=None, settings=None):
if not isinstance(query, ScanQuery):
query = ScanQuery(query, {})
parameters = {} if parameters is None else parameters
collect_stats = getattr(
settings,
"collect_stats",
_apis.ydb_table.QueryStatsCollection.Mode.STATS_COLLECTION_NONE,
)
return _apis.ydb_table.ExecuteScanQueryRequest(
mode=_apis.ydb_table.ExecuteScanQueryRequest.Mode.MODE_EXEC,
query=_apis.ydb_table.Query(yql_text=query.yql_text),
parameters=convert.parameters_to_pb(query.parameters_types, parameters),
collect_stats=collect_stats,
)
class ISession(abc.ABC):
@abstractmethod
def __init__(self, driver, table_client_settings):
pass
@abstractmethod
def __lt__(self, other):
pass
@abstractmethod
def __eq__(self, other):
pass
@property
@abstractmethod
def session_id(self):
pass
@abstractmethod
def initialized(self):
"""
Return True if session is successfully initialized with a session_id and False otherwise.
"""
pass
@abstractmethod
def pending_query(self):
pass
@abstractmethod
def reset(self):
"""
Perform session state reset (that includes cleanup of the session_id, query cache, and etc.)
"""
pass
@abstractmethod
def read_table(
self,
path,
key_range=None,
columns=(),
ordered=False,
row_limit=None,
settings=None,
use_snapshot=None,
):
"""
Perform an read table request.
:param path: A path to the table
:param key_range: (optional) A KeyRange instance that describes a range to read. The KeyRange instance\
should include from_bound and/or to_bound. Each of the bounds (if provided) should specify a value of the\
key bound, and type of the key prefix. See an example above.
:param columns: (optional) An iterable with table columns to read.
:param ordered: (optional) A flag that indicates that result should be ordered.
:param row_limit: (optional) A number of rows to read.
:param settings: Request settings
:return: SyncResponseIterator instance
"""
pass
@abstractmethod
def keep_alive(self, settings=None):
pass
@abstractmethod
def create(self, settings=None):
pass
@abstractmethod
def delete(self, settings=None):
pass
@abstractmethod
def execute_scheme(self, yql_text, settings=None):
pass
@abstractmethod
def transaction(self, tx_mode=None, allow_split_transactions=None):
pass
@abstractmethod
def has_prepared(self, query):
pass
@abstractmethod
def prepare(self, query, settings=None):
pass
@abstractmethod
def explain(self, yql_text, settings=None):
"""
Expiremental API.
:param yql_text:
:param settings:
:return:
"""
pass
@abstractmethod
def create_table(self, path, table_description, settings=None):
"""
Create a YDB table.
:param path: A table path
:param table_description: A description of table to create. An instance TableDescription
:param settings: An instance of BaseRequestSettings that describes how rpc should invoked.
:return: A description of created scheme entry or error otherwise.
"""
pass
@abstractmethod
def drop_table(self, path, settings=None):
pass
@abstractmethod
def alter_table(
self,
path,
add_columns=None,
drop_columns=None,
settings=None,
alter_attributes=None,
add_indexes=None,
drop_indexes=None,
set_ttl_settings=None,
drop_ttl_settings=None,
add_column_families=None,
alter_column_families=None,
alter_storage_settings=None,
set_compaction_policy=None,
alter_partitioning_settings=None,
set_key_bloom_filter=None,
set_read_replicas_settings=None,
):
pass
@abstractmethod
def copy_table(self, source_path, destination_path, settings=None):
pass
@abstractmethod
def copy_tables(self, source_destination_pairs, settings=None):
pass
def describe_table(self, path, settings=None):
"""
Returns a description of the table by provided path
:param path: A table path
:param settings: A request settings
:return: Description of a table
"""
pass
class ITableClient(abc.ABC):
def __init__(self, driver, table_client_settings=None):
pass
@abstractmethod
def session(self):
pass
@abstractmethod
def scan_query(self, query, parameters=None, settings=None):
pass
@abstractmethod
def bulk_upsert(self, table_path, rows, column_types, settings=None):
"""
Bulk upsert data
:param table_path: A table path.
:param rows: A list of structures.
:param column_types: Bulk upsert column types.
"""
pass
class BaseTableClient(ITableClient):
def __init__(self, driver, table_client_settings=None):
# type:(ydb.Driver, ydb.TableClientSettings) -> None
self._driver = driver
self._table_client_settings = TableClientSettings() if table_client_settings is None else table_client_settings
def session(self):
# type: () -> ydb.Session
return Session(self._driver, self._table_client_settings)
def scan_query(self, query, parameters=None, settings=None):
# type: (ydb.ScanQuery, tuple, ydb.BaseRequestSettings) -> ydb.SyncResponseIterator
request = _scan_query_request_factory(query, parameters, settings)
stream_it = self._driver(
request,
_apis.TableService.Stub,
_apis.TableService.StreamExecuteScanQuery,
settings=settings,
)
return _utilities.SyncResponseIterator(
stream_it,
lambda resp: _wrap_scan_query_response(resp, self._table_client_settings),
)
def bulk_upsert(self, table_path, rows, column_types, settings=None):
# type: (str, list, ydb.AbstractTypeBuilder | ydb.PrimitiveType, ydb.BaseRequestSettings) -> None
"""
Bulk upsert data
:param table_path: A table path.
:param rows: A list of structures.
:param column_types: Bulk upsert column types.
"""
return self._driver(
_session_impl.bulk_upsert_request_factory(table_path, rows, column_types),
_apis.TableService.Stub,
_apis.TableService.BulkUpsert,
_session_impl.wrap_operation_bulk_upsert,
settings,
(),
)
[docs]
class TableClient(BaseTableClient):
def __init__(self, driver, table_client_settings=None):
# type:(ydb.Driver, ydb.TableClientSettings) -> None
super().__init__(driver=driver, table_client_settings=table_client_settings)
self._pool: Optional[SessionPool] = None
def __del__(self):
self._stop_pool_if_needed()
[docs]
def async_scan_query(self, query, parameters=None, settings=None):
# type: (ydb.ScanQuery, tuple, ydb.BaseRequestSettings) -> ydb.AsyncResponseIterator
request = _scan_query_request_factory(query, parameters, settings)
stream_it = self._driver(
request,
_apis.TableService.Stub,
_apis.TableService.StreamExecuteScanQuery,
settings=settings,
)
return _utilities.AsyncResponseIterator(
stream_it,
lambda resp: _wrap_scan_query_response(resp, self._table_client_settings),
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_bulk_upsert(self, table_path, rows, column_types, settings=None):
# type: (str, list, ydb.AbstractTypeBuilder | ydb.PrimitiveType, ydb.BaseRequestSettings) -> None
return self._driver.future(
_session_impl.bulk_upsert_request_factory(table_path, rows, column_types),
_apis.TableService.Stub,
_apis.TableService.BulkUpsert,
_session_impl.wrap_operation_bulk_upsert,
settings,
(),
)
def _init_pool_if_needed(self):
if self._pool is None:
self._pool = SessionPool(self._driver, 10)
def _stop_pool_if_needed(self, timeout=10):
if self._pool is not None:
self._pool.stop(timeout=timeout)
[docs]
def create_table(
self,
path: str,
table_description: "TableDescription",
settings: Optional["settings_impl.BaseRequestSettings"] = None,
) -> "ydb.Operation":
"""
Create a YDB table.
:param path: A table path
:param table_description: TableDescription instanse.
:param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
:return: Operation or YDB error otherwise.
"""
self._init_pool_if_needed()
def callee(session: Session):
return session.create_table(path=path, table_description=table_description, settings=settings)
return self._pool.retry_operation_sync(callee)
[docs]
def drop_table(
self,
path: str,
settings: Optional["settings_impl.BaseRequestSettings"] = None,
) -> "ydb.Operation":
"""
Drop a YDB table.
:param path: A table path
:param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
:return: Operation or YDB error otherwise.
"""
self._init_pool_if_needed()
def callee(session: Session):
return session.drop_table(path=path, settings=settings)
return self._pool.retry_operation_sync(callee)
[docs]
def alter_table(
self,
path: str,
add_columns: Optional[List["ydb.Column"]] = None,
drop_columns: Optional[List[str]] = None,
settings: Optional["settings_impl.BaseRequestSettings"] = None,
alter_attributes: Optional[Optional[Dict[str, str]]] = None,
add_indexes: Optional[List["ydb.TableIndex"]] = None,
drop_indexes: Optional[List[str]] = None,
set_ttl_settings: Optional["ydb.TtlSettings"] = None,
drop_ttl_settings: Optional[Any] = None,
add_column_families: Optional[List["ydb.ColumnFamily"]] = None,
alter_column_families: Optional[List["ydb.ColumnFamily"]] = None,
alter_storage_settings: Optional["ydb.StorageSettings"] = None,
set_compaction_policy: Optional[str] = None,
alter_partitioning_settings: Optional["ydb.PartitioningSettings"] = None,
set_key_bloom_filter: Optional["ydb.FeatureFlag"] = None,
set_read_replicas_settings: Optional["ydb.ReadReplicasSettings"] = None,
) -> "ydb.Operation":
"""
Alter a YDB table.
:param path: A table path
:param add_columns: List of ydb.Column to add
:param drop_columns: List of column names to drop
:param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
:param alter_attributes: Dict of attributes to alter
:param add_indexes: List of ydb.TableIndex to add
:param drop_indexes: List of index names to drop
:param set_ttl_settings: ydb.TtlSettings to set
:param drop_ttl_settings: Any to drop
:param add_column_families: List of ydb.ColumnFamily to add
:param alter_column_families: List of ydb.ColumnFamily to alter
:param alter_storage_settings: ydb.StorageSettings to alter
:param set_compaction_policy: Compaction policy
:param alter_partitioning_settings: ydb.PartitioningSettings to alter
:param set_key_bloom_filter: ydb.FeatureFlag to set key bloom filter
:return: Operation or YDB error otherwise.
"""
self._init_pool_if_needed()
def callee(session: Session):
return session.alter_table(
path=path,
add_columns=add_columns,
drop_columns=drop_columns,
settings=settings,
alter_attributes=alter_attributes,
add_indexes=add_indexes,
drop_indexes=drop_indexes,
set_ttl_settings=set_ttl_settings,
drop_ttl_settings=drop_ttl_settings,
add_column_families=add_column_families,
alter_column_families=alter_column_families,
alter_storage_settings=alter_storage_settings,
set_compaction_policy=set_compaction_policy,
alter_partitioning_settings=alter_partitioning_settings,
set_key_bloom_filter=set_key_bloom_filter,
set_read_replicas_settings=set_read_replicas_settings,
)
return self._pool.retry_operation_sync(callee)
[docs]
def describe_table(
self,
path: str,
settings: Optional["settings_impl.BaseRequestSettings"] = None,
) -> "ydb.TableSchemeEntry":
"""
Describe a YDB table.
:param path: A table path
:param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
:return: TableSchemeEntry or YDB error otherwise.
"""
self._init_pool_if_needed()
def callee(session: Session):
return session.describe_table(path=path, settings=settings)
return self._pool.retry_operation_sync(callee)
[docs]
def copy_table(
self,
source_path: str,
destination_path: str,
settings: Optional["settings_impl.BaseRequestSettings"] = None,
) -> "ydb.Operation":
"""
Copy a YDB table.
:param source_path: A table path
:param destination_path: Destination table path
:param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
:return: Operation or YDB error otherwise.
"""
self._init_pool_if_needed()
def callee(session: Session):
return session.copy_table(
source_path=source_path,
destination_path=destination_path,
settings=settings,
)
return self._pool.retry_operation_sync(callee)
[docs]
def copy_tables(
self,
source_destination_pairs: List[Tuple[str, str]],
settings: Optional["settings_impl.BaseRequestSettings"] = None,
) -> "ydb.Operation":
"""
Copy a YDB tables.
:param source_destination_pairs: List of tuples (source_path, destination_path)
:param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
:return: Operation or YDB error otherwise.
"""
self._init_pool_if_needed()
def callee(session: Session):
return session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings)
return self._pool.retry_operation_sync(callee)
[docs]
def rename_tables(
self,
rename_items: List[Tuple[str, str]],
settings: Optional["settings_impl.BaseRequestSettings"] = None,
) -> "ydb.Operation":
"""
Rename a YDB tables.
:param rename_items: List of tuples (current_name, desired_name)
:param settings: An instance of BaseRequestSettings that describes how rpc should be invoked.
:return: Operation or YDB error otherwise.
"""
self._init_pool_if_needed()
def callee(session: Session):
return session.rename_tables(rename_items=rename_items, settings=settings)
return self._pool.retry_operation_sync(callee)
def _make_index_description(index):
result = TableIndex(index.name).with_index_columns(*tuple(col for col in index.index_columns))
result.status = IndexStatus(index.status)
return result
class TableSchemeEntry(scheme.SchemeEntry):
def __init__(
self,
name,
owner,
type,
effective_permissions,
permissions,
size_bytes,
columns,
primary_key,
shard_key_bounds,
indexes,
table_stats,
ttl_settings,
attributes,
partitioning_settings,
column_families,
key_bloom_filter,
read_replicas_settings,
storage_settings,
*args,
**kwargs
):
super(TableSchemeEntry, self).__init__(
name, owner, type, effective_permissions, permissions, size_bytes, *args, **kwargs
)
self.primary_key = [pk for pk in primary_key]
self.columns = [Column(column.name, convert.type_to_native(column.type), column.family) for column in columns]
self.indexes = [_make_index_description(index) for index in indexes]
self.shard_key_ranges = []
self.column_families = []
self.key_bloom_filter = FeatureFlag(key_bloom_filter)
left_key_bound = None
for column_family in column_families:
self.column_families.append(
ColumnFamily()
.with_name(column_family.name)
.with_keep_in_memory(FeatureFlag(column_family.keep_in_memory))
.with_compression(Compression(column_family.compression))
)
if column_family.HasField("data"):
self.column_families[-1].with_data(StoragePool(column_family.data.media))
for shard_key_bound in shard_key_bounds:
# for next key range
key_bound_type = shard_key_bound.type
current_bound = convert.to_native_value(shard_key_bound)
self.shard_key_ranges.append(
KeyRange(
None if left_key_bound is None else KeyBound.inclusive(left_key_bound, key_bound_type),
KeyBound.exclusive(current_bound, key_bound_type),
)
)
left_key_bound = current_bound
assert isinstance(left_key_bound, tuple)
if len(shard_key_bounds) > 0:
self.shard_key_ranges.append(
KeyRange(
KeyBound.inclusive(left_key_bound, shard_key_bounds[-1].type),
None,
)
)
else:
self.shard_key_ranges.append(KeyRange(None, None))
self.read_replicas_settings = None
if read_replicas_settings is not None:
self.read_replicas_settings = ReadReplicasSettings()
for field in ("per_az_read_replicas_count", "any_az_read_replicas_count"):
if read_replicas_settings.WhichOneof("settings") == field:
setattr(
self.read_replicas_settings,
field,
getattr(read_replicas_settings, field),
)
self.storage_settings = None
if storage_settings is not None:
self.storage_settings = StorageSettings()
self.storage_settings.store_external_blobs = FeatureFlag(self.storage_settings.store_external_blobs)
if storage_settings.HasField("tablet_commit_log0"):
self.storage_settings.with_tablet_commit_log0(StoragePool(storage_settings.tablet_commit_log0.media))
if storage_settings.HasField("tablet_commit_log1"):
self.storage_settings.with_tablet_commit_log1(StoragePool(storage_settings.tablet_commit_log1.media))
if storage_settings.HasField("external"):
self.storage_settings.with_external(StoragePool(storage_settings.external.media))
self.partitioning_settings = None
if partitioning_settings is not None:
self.partitioning_settings = PartitioningSettings()
for field in (
"partitioning_by_size",
"partitioning_by_load",
"partition_size_mb",
"min_partitions_count",
"max_partitions_count",
):
setattr(
self.partitioning_settings,
field,
getattr(partitioning_settings, field),
)
self.ttl_settings = None
if ttl_settings is not None:
if ttl_settings.HasField("date_type_column"):
self.ttl_settings = TtlSettings().with_date_type_column(
ttl_settings.date_type_column.column_name,
ttl_settings.date_type_column.expire_after_seconds,
)
elif ttl_settings.HasField("value_since_unix_epoch"):
self.ttl_settings = TtlSettings().with_value_since_unix_epoch(
ttl_settings.value_since_unix_epoch.column_name,
ColumnUnit(ttl_settings.value_since_unix_epoch.column_unit),
ttl_settings.value_since_unix_epoch.expire_after_seconds,
)
self.table_stats = None
if table_stats is not None:
self.table_stats = TableStats()
if table_stats.partitions != 0:
self.table_stats = self.table_stats.with_partitions(table_stats.partitions)
if table_stats.store_size != 0:
self.table_stats = self.table_stats.with_store_size(table_stats.store_size)
self.attributes = attributes
class RenameItem:
def __init__(self, source_path, destination_path, replace_destination=False):
self._source_path = source_path
self._destination_path = destination_path
self._replace_destination = replace_destination
@property
def source_path(self):
return self._source_path
@property
def destination_path(self):
return self._destination_path
@property
def replace_destination(self):
return self._replace_destination
class BaseSession(ISession):
def __init__(self, driver, table_client_settings):
self._driver = driver
self._state = _session_impl.SessionState(table_client_settings)
def __lt__(self, other):
return self.session_id < other.session_id
def __eq__(self, other):
return self.session_id == other.session_id
@property
def session_id(self):
"""
Return session_id.
"""
return self._state.session_id
def initialized(self):
"""
Return True if session is successfully initialized with a session_id and False otherwise.
"""
return self._state.session_id is not None
def pending_query(self):
return self._state.pending_query()
def closing(self):
"""Returns True if session is closing."""
return self._state.closing()
def reset(self):
"""
Perform session state reset (that includes cleanup of the session_id, query cache, and etc.)
"""
return self._state.reset()
def read_table(
self,
path,
key_range=None,
columns=(),
ordered=False,
row_limit=None,
settings=None,
use_snapshot=None,
):
"""
Perform an read table request.
:param path: A path to the table
:param key_range: (optional) A KeyRange instance that describes a range to read. The KeyRange instance\
should include from_bound and/or to_bound. Each of the bounds (if provided) should specify a value of the\
key bound, and type of the key prefix. See an example above.
:param columns: (optional) An iterable with table columns to read.
:param ordered: (optional) A flag that indicates that result should be ordered.
:param row_limit: (optional) A number of rows to read.
:return: SyncResponseIterator instance
"""
request = _session_impl.read_table_request_factory(
self._state,
path,
key_range,
columns,
ordered,
row_limit,
use_snapshot=use_snapshot,
)
stream_it = self._driver(
request,
_apis.TableService.Stub,
_apis.TableService.StreamReadTable,
settings=settings,
)
return _utilities.SyncResponseIterator(stream_it, _session_impl.wrap_read_table_response)
def keep_alive(self, settings=None):
return self._driver(
_session_impl.keep_alive_request_factory(self._state),
_apis.TableService.Stub,
_apis.TableService.KeepAlive,
_session_impl.wrap_keep_alive_response,
settings,
(self._state, self),
self._state.endpoint,
)
def create(self, settings=None):
if self._state.session_id is not None:
return self
create_settings = settings_impl.BaseRequestSettings()
if settings is not None:
create_settings = settings.make_copy()
create_settings = create_settings.with_header("x-ydb-client-capabilities", "session-balancer")
return self._driver(
_apis.ydb_table.CreateSessionRequest(),
_apis.TableService.Stub,
_apis.TableService.CreateSession,
_session_impl.initialize_session,
create_settings,
(self._state, self),
self._state.endpoint,
)
def delete(self, settings=None):
return self._driver(
self._state.attach_request(_apis.ydb_table.DeleteSessionRequest()),
_apis.TableService.Stub,
_apis.TableService.DeleteSession,
_session_impl.cleanup_session,
settings,
(self._state, self),
self._state.endpoint,
)
def execute_scheme(self, yql_text, settings=None):
return self._driver(
_session_impl.execute_scheme_request_factory(self._state, yql_text),
_apis.TableService.Stub,
_apis.TableService.ExecuteSchemeQuery,
_session_impl.wrap_execute_scheme_result,
settings,
(self._state,),
self._state.endpoint,
)
def transaction(self, tx_mode=None, allow_split_transactions=None):
return TxContext(
self._driver,
self._state,
self,
tx_mode,
allow_split_transactions=allow_split_transactions,
)
def has_prepared(self, query):
return query in self._state
def prepare(self, query, settings=None):
data_query, _ = self._state.lookup(query)
if data_query is not None:
return data_query
return self._driver(
_session_impl.prepare_request_factory(self._state, query),
_apis.TableService.Stub,
_apis.TableService.PrepareDataQuery,
_session_impl.wrap_prepare_query_response,
settings,
(self._state, query),
self._state.endpoint,
)
def explain(self, yql_text, settings=None):
"""
Expiremental API.
:param yql_text:
:param settings:
:return:
"""
return self._driver(
_session_impl.explain_data_query_request_factory(self._state, yql_text),
_apis.TableService.Stub,
_apis.TableService.ExplainDataQuery,
_session_impl.wrap_explain_response,
settings,
(self._state,),
self._state.endpoint,
)
def create_table(self, path, table_description, settings=None):
"""
Create a YDB table.
:param path: A table path
:param table_description: A description of table to create. An instance TableDescription
:param settings: An instance of BaseRequestSettings that describes how rpc should invoked.
:return: A description of created scheme entry or error otherwise.
"""
return self._driver(
_session_impl.create_table_request_factory(self._state, path, table_description),
_apis.TableService.Stub,
_apis.TableService.CreateTable,
_session_impl.wrap_operation,
settings,
(self._driver,),
self._state.endpoint,
)
def drop_table(self, path, settings=None):
return self._driver(
self._state.attach_request(_apis.ydb_table.DropTableRequest(path=path)),
_apis.TableService.Stub,
_apis.TableService.DropTable,
_session_impl.wrap_operation,
settings,
(self._state,),
self._state.endpoint,
)
def alter_table(
self,
path,
add_columns=None,
drop_columns=None,
settings=None,
alter_attributes=None,
add_indexes=None,
drop_indexes=None,
set_ttl_settings=None,
drop_ttl_settings=None,
add_column_families=None,
alter_column_families=None,
alter_storage_settings=None,
set_compaction_policy=None,
alter_partitioning_settings=None,
set_key_bloom_filter=None,
set_read_replicas_settings=None,
):
return self._driver(
_session_impl.alter_table_request_factory(
self._state,
path,
add_columns,
drop_columns,
alter_attributes,
add_indexes,
drop_indexes,
set_ttl_settings,
drop_ttl_settings,
add_column_families,
alter_column_families,
alter_storage_settings,
set_compaction_policy,
alter_partitioning_settings,
set_key_bloom_filter,
set_read_replicas_settings,
),
_apis.TableService.Stub,
_apis.TableService.AlterTable,
_session_impl.AlterTableOperation,
settings,
(self._driver,),
self._state.endpoint,
)
def describe_table(self, path, settings=None):
"""
Returns a description of the table by provided path
:param path: A table path
:param settings: A request settings
:return: Description of a table
"""
return self._driver(
_session_impl.describe_table_request_factory(self._state, path, settings),
_apis.TableService.Stub,
_apis.TableService.DescribeTable,
_session_impl.wrap_describe_table_response,
settings,
(self._state, TableSchemeEntry),
self._state.endpoint,
)
def copy_table(self, source_path, destination_path, settings=None):
return self.copy_tables([(source_path, destination_path)], settings=settings)
def copy_tables(self, source_destination_pairs, settings=None):
return self._driver(
_session_impl.copy_tables_request_factory(self._state, source_destination_pairs),
_apis.TableService.Stub,
_apis.TableService.CopyTables,
_session_impl.wrap_operation,
settings,
(self._state,),
self._state.endpoint,
)
def rename_tables(self, rename_items, settings=None):
return self._driver(
_session_impl.rename_tables_request_factory(self._state, rename_items),
_apis.TableService.Stub,
_apis.TableService.RenameTables,
_session_impl.wrap_operation,
settings,
(self._state,),
self._state.endpoint,
)
[docs]
class Session(BaseSession):
[docs]
def async_read_table(
self,
path,
key_range=None,
columns=(),
ordered=False,
row_limit=None,
settings=None,
use_snapshot=None,
):
"""
Perform an read table request.
:param path: A path to the table
:param key_range: (optional) A KeyRange instance that describes a range to read. The KeyRange instance\
should include from_bound and/or to_bound. Each of the bounds (if provided) should specify a value of the\
key bound, and type of the key prefix. See an example above.
:param columns: (optional) An iterable with table columns to read.
:param ordered: (optional) A flag that indicates that result should be ordered.
:param row_limit: (optional) A number of rows to read.
:return: AsyncResponseIterator instance
"""
if interceptor is None:
raise RuntimeError("Async read table is not available due to import issues")
request = _session_impl.read_table_request_factory(
self._state,
path,
key_range,
columns,
ordered,
row_limit,
use_snapshot=use_snapshot,
)
stream_it = self._driver(
request,
_apis.TableService.Stub,
_apis.TableService.StreamReadTable,
settings=settings,
)
return _utilities.AsyncResponseIterator(stream_it, _session_impl.wrap_read_table_response)
[docs]
@_utilities.wrap_async_call_exceptions
def async_keep_alive(self, settings=None):
return self._driver.future(
_session_impl.keep_alive_request_factory(self._state),
_apis.TableService.Stub,
_apis.TableService.KeepAlive,
_session_impl.wrap_keep_alive_response,
settings,
(self._state, self),
self._state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_create(self, settings=None):
if self._state.session_id is not None:
return _utilities.wrap_result_in_future(self)
create_settings = settings_impl.BaseRequestSettings()
if settings is not None:
create_settings = settings.make_copy()
create_settings = create_settings.with_header("x-ydb-client-capabilities", "session-balancer")
return self._driver.future(
_apis.ydb_table.CreateSessionRequest(),
_apis.TableService.Stub,
_apis.TableService.CreateSession,
_session_impl.initialize_session,
create_settings,
(self._state, self),
self._state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_delete(self, settings=None):
return self._driver.future(
self._state.attach_request(_apis.ydb_table.DeleteSessionRequest()),
_apis.TableService.Stub,
_apis.TableService.DeleteSession,
_session_impl.cleanup_session,
settings,
(self._state, self),
self._state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_execute_scheme(self, yql_text, settings=None):
return self._driver.future(
_session_impl.execute_scheme_request_factory(self._state, yql_text),
_apis.TableService.Stub,
_apis.TableService.ExecuteSchemeQuery,
_session_impl.wrap_execute_scheme_result,
settings,
(self._state,),
self._state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_prepare(self, query, settings=None):
data_query, _ = self._state.lookup(query)
if data_query is not None:
return _utilities.wrap_result_in_future(data_query)
return self._driver.future(
_session_impl.prepare_request_factory(self._state, query),
_apis.TableService.Stub,
_apis.TableService.PrepareDataQuery,
_session_impl.wrap_prepare_query_response,
settings,
(
self._state,
query,
),
self._state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_create_table(self, path, table_description, settings=None):
return self._driver.future(
_session_impl.create_table_request_factory(self._state, path, table_description),
_apis.TableService.Stub,
_apis.TableService.CreateTable,
_session_impl.wrap_operation,
settings,
(self._driver,),
self._state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_drop_table(self, path, settings=None):
return self._driver.future(
self._state.attach_request(_apis.ydb_table.DropTableRequest(path=path)),
_apis.TableService.Stub,
_apis.TableService.DropTable,
_session_impl.wrap_operation,
settings,
(self._state,),
self._state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_alter_table(
self,
path,
add_columns=None,
drop_columns=None,
settings=None,
alter_attributes=None,
add_indexes=None,
drop_indexes=None,
set_ttl_settings=None,
drop_ttl_settings=None,
add_column_families=None,
alter_column_families=None,
alter_storage_settings=None,
set_compaction_policy=None,
alter_partitioning_settings=None,
set_key_bloom_filter=None,
set_read_replicas_settings=None,
):
return self._driver.future(
_session_impl.alter_table_request_factory(
self._state,
path,
add_columns,
drop_columns,
alter_attributes,
add_indexes,
drop_indexes,
set_ttl_settings,
drop_ttl_settings,
add_column_families,
alter_column_families,
alter_storage_settings,
set_compaction_policy,
alter_partitioning_settings,
set_key_bloom_filter,
set_read_replicas_settings,
),
_apis.TableService.Stub,
_apis.TableService.AlterTable,
_session_impl.AlterTableOperation,
settings,
(self._driver,),
self._state.endpoint,
)
[docs]
def async_copy_table(self, source_path, destination_path, settings=None):
return self.async_copy_tables([(source_path, destination_path)], settings=settings)
[docs]
@_utilities.wrap_async_call_exceptions
def async_copy_tables(self, source_destination_pairs, settings=None):
return self._driver.future(
_session_impl.copy_tables_request_factory(self._state, source_destination_pairs),
_apis.TableService.Stub,
_apis.TableService.CopyTables,
_session_impl.wrap_operation,
settings,
(self._state,),
self._state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_rename_tables(self, rename_tables, settings=None):
return self._driver.future(
_session_impl.rename_tables_request_factory(self._state, rename_tables),
_apis.TableService.Stub,
_apis.TableService.RenameTables,
_session_impl.wrap_operation,
settings,
(self._state,),
self._state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_describe_table(self, path, settings=None):
return self._driver.future(
_session_impl.describe_table_request_factory(self._state, path, settings),
_apis.TableService.Stub,
_apis.TableService.DescribeTable,
_session_impl.wrap_describe_table_response,
settings,
(self._state, TableSchemeEntry),
self._state.endpoint,
)
class ITxContext(abc.ABC):
@abstractmethod
def __init__(self, driver, session_state, session, tx_mode=None):
"""
An object that provides a simple transaction context manager that allows statements execution
in a transaction. You don't have to open transaction explicitly, because context manager encapsulates
transaction control logic, and opens new transaction if:
1) By explicit .begin();
2) On execution of a first statement, which is strictly recommended method, because that avoids
useless round trip
This context manager is not thread-safe, so you should not manipulate on it concurrently.
:param driver: A driver instance
:param session_state: A state of session
:param tx_mode: A transaction mode, which is a one from the following choices:
1) SerializableReadWrite() which is default mode;
2) OnlineReadOnly();
3) StaleReadOnly().
"""
pass
@abstractmethod
def __enter__(self):
"""
Enters a context manager and returns a session
:return: A session instance
"""
pass
@abstractmethod
def __exit__(self, *args, **kwargs):
"""
Closes a transaction context manager and rollbacks transaction if
it is not rolled back explicitly
"""
pass
@property
@abstractmethod
def session_id(self):
"""
A transaction's session id
:return: A transaction's session id
"""
pass
@property
@abstractmethod
def tx_id(self):
"""
Returns a id of open transaction or None otherwise
:return: A id of open transaction or None otherwise
"""
pass
@abstractmethod
def execute(self, query, parameters=None, commit_tx=False, settings=None):
"""
Sends a query (yql text or an instance of DataQuery) to be executed with parameters.
Execution with parameters supported only for DataQuery instances and is not supported yql text queries.
:param query: A query, yql text or DataQuery instance.
:param parameters: A dictionary with parameters values.
:param commit_tx: A special flag that allows transaction commit
:param settings: An additional request settings
:return: A result sets or exception in case of execution errors
"""
pass
@abstractmethod
def commit(self, settings=None):
"""
Calls commit on a transaction if it is open otherwise is no-op. If transaction execution
failed then this method raises PreconditionFailed.
:param settings: A request settings
:return: A committed transaction or exception if commit is failed
"""
pass
@abstractmethod
def rollback(self, settings=None):
"""
Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
failed then this method raises PreconditionFailed.
:param settings: A request settings
:return: A rolled back transaction or exception if rollback is failed
"""
pass
@abstractmethod
def begin(self, settings=None):
"""
Explicitly begins a transaction
:param settings: A request settings
:return: An open transaction
"""
pass
class BaseTxContext(ITxContext):
__slots__ = (
"_tx_state",
"_session_state",
"_driver",
"session",
"_finished",
"_allow_split_transactions",
)
_COMMIT = "commit"
_ROLLBACK = "rollback"
def __init__(self, driver, session_state, session, tx_mode=None, *, allow_split_transactions=None):
"""
An object that provides a simple transaction context manager that allows statements execution
in a transaction. You don't have to open transaction explicitly, because context manager encapsulates
transaction control logic, and opens new transaction if:
1) By explicit .begin() and .async_begin() methods;
2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip
This context manager is not thread-safe, so you should not manipulate on it concurrently.
:param driver: A driver instance
:param session_state: A state of session
:param tx_mode: A transaction mode, which is a one from the following choices:
1) SerializableReadWrite() which is default mode;
2) OnlineReadOnly();
3) StaleReadOnly().
"""
self._driver = driver
tx_mode = SerializableReadWrite() if tx_mode is None else tx_mode
self._tx_state = _tx_ctx_impl.TxState(tx_mode)
self._session_state = session_state
self.session = session
self._finished = ""
self._allow_split_transactions = allow_split_transactions
def __enter__(self):
"""
Enters a context manager and returns a session
:return: A session instance
"""
return self
def __exit__(self, *args, **kwargs):
"""
Closes a transaction context manager and rollbacks transaction if
it is not rolled back explicitly
"""
if self._tx_state.tx_id is not None:
# It's strictly recommended to close transactions directly
# by using commit_tx=True flag while executing statement or by
# .commit() or .rollback() methods, but here we trying to do best
# effort to avoid useless open transactions
logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id)
try:
self.rollback()
except issues.Error:
logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id)
self._tx_state.tx_id = None
@property
def session_id(self):
"""
A transaction's session id
:return: A transaction's session id
"""
return self._session_state.session_id
@property
def tx_id(self):
"""
Returns a id of open transaction or None otherwise
:return: A id of open transaction or None otherwise
"""
return self._tx_state.tx_id
def execute(self, query, parameters=None, commit_tx=False, settings=None):
"""
Sends a query (yql text or an instance of DataQuery) to be executed with parameters.
Execution with parameters supported only for DataQuery instances and is not supported yql text queries.
:param query: A query, yql text or DataQuery instance.
:param parameters: A dictionary with parameters values.
:param commit_tx: A special flag that allows transaction commit
:param settings: An additional request settings
:return: A result sets or exception in case of execution errors
"""
self._check_split()
if commit_tx:
self._set_finish(self._COMMIT)
return self._driver(
_tx_ctx_impl.execute_request_factory(
self._session_state,
self._tx_state,
query,
parameters,
commit_tx,
settings,
),
_apis.TableService.Stub,
_apis.TableService.ExecuteDataQuery,
_tx_ctx_impl.wrap_result_and_tx_id,
settings,
(self._session_state, self._tx_state, query),
self._session_state.endpoint,
)
def commit(self, settings=None):
"""
Calls commit on a transaction if it is open otherwise is no-op. If transaction execution
failed then this method raises PreconditionFailed.
:param settings: A request settings
:return: A committed transaction or exception if commit is failed
"""
self._set_finish(self._COMMIT)
if self._tx_state.tx_id is None and not self._tx_state.dead:
return self
return self._driver(
_tx_ctx_impl.commit_request_factory(self._session_state, self._tx_state),
_apis.TableService.Stub,
_apis.TableService.CommitTransaction,
_tx_ctx_impl.wrap_result_on_rollback_or_commit_tx,
settings,
(self._session_state, self._tx_state, self),
self._session_state.endpoint,
)
def rollback(self, settings=None):
"""
Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
failed then this method raises PreconditionFailed.
:param settings: A request settings
:return: A rolled back transaction or exception if rollback is failed
"""
self._set_finish(self._ROLLBACK)
if self._tx_state.tx_id is None and not self._tx_state.dead:
return self
return self._driver(
_tx_ctx_impl.rollback_request_factory(self._session_state, self._tx_state),
_apis.TableService.Stub,
_apis.TableService.RollbackTransaction,
_tx_ctx_impl.wrap_result_on_rollback_or_commit_tx,
settings,
(self._session_state, self._tx_state, self),
self._session_state.endpoint,
)
def begin(self, settings=None):
"""
Explicitly begins a transaction
:param settings: A request settings
:return: An open transaction
"""
if self._tx_state.tx_id is not None:
return self
self._check_split()
return self._driver(
_tx_ctx_impl.begin_request_factory(self._session_state, self._tx_state),
_apis.TableService.Stub,
_apis.TableService.BeginTransaction,
_tx_ctx_impl.wrap_tx_begin_response,
settings,
(self._session_state, self._tx_state, self),
self._session_state.endpoint,
)
def _set_finish(self, val):
self._check_split(val)
self._finished = val
def _check_split(self, allow=""):
"""
Deny all operaions with transaction after commit/rollback.
Exception: double commit and double rollbacks, because it is safe
"""
allow_split_transaction = (
self._allow_split_transactions
if self._allow_split_transactions is not None
else _default_allow_split_transaction
)
if allow_split_transaction:
return
if self._finished != "" and self._finished != allow:
raise RuntimeError("Any operation with finished transaction is denied")
[docs]
class TxContext(BaseTxContext):
[docs]
@_utilities.wrap_async_call_exceptions
def async_execute(self, query, parameters=None, commit_tx=False, settings=None):
"""
Sends a query (yql text or an instance of DataQuery) to be executed with parameters.
Execution with parameters supported only for DataQuery instances and not supported for YQL text.
:param query: A query: YQL text or DataQuery instance. E
:param parameters: A dictionary with parameters values.
:param commit_tx: A special flag that allows transaction commit
:param settings: A request settings (an instance of ExecDataQuerySettings)
:return: A future of query execution
"""
self._check_split()
return self._driver.future(
_tx_ctx_impl.execute_request_factory(
self._session_state,
self._tx_state,
query,
parameters,
commit_tx,
settings,
),
_apis.TableService.Stub,
_apis.TableService.ExecuteDataQuery,
_tx_ctx_impl.wrap_result_and_tx_id,
settings,
(
self._session_state,
self._tx_state,
query,
),
self._session_state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_commit(self, settings=None):
"""
Calls commit on a transaction if it is open otherwise is no-op. If transaction execution
failed then this method raises PreconditionFailed.
:param settings: A request settings (an instance of BaseRequestSettings)
:return: A future of commit call
"""
self._set_finish(self._COMMIT)
if self._tx_state.tx_id is None and not self._tx_state.dead:
return _utilities.wrap_result_in_future(self)
return self._driver.future(
_tx_ctx_impl.commit_request_factory(self._session_state, self._tx_state),
_apis.TableService.Stub,
_apis.TableService.CommitTransaction,
_tx_ctx_impl.wrap_result_on_rollback_or_commit_tx,
settings,
(self._session_state, self._tx_state, self),
self._session_state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_rollback(self, settings=None):
"""
Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
failed then this method raises PreconditionFailed.
:param settings: A request settings
:return: A future of rollback call
"""
self._set_finish(self._ROLLBACK)
if self._tx_state.tx_id is None and not self._tx_state.dead:
return _utilities.wrap_result_in_future(self)
return self._driver.future(
_tx_ctx_impl.rollback_request_factory(self._session_state, self._tx_state),
_apis.TableService.Stub,
_apis.TableService.RollbackTransaction,
_tx_ctx_impl.wrap_result_on_rollback_or_commit_tx,
settings,
(self._session_state, self._tx_state, self),
self._session_state.endpoint,
)
[docs]
@_utilities.wrap_async_call_exceptions
def async_begin(self, settings=None):
"""
Explicitly begins a transaction
:param settings: A request settings
:return: A future of begin call
"""
if self._tx_state.tx_id is not None:
return _utilities.wrap_result_in_future(self)
self._check_split()
return self._driver.future(
_tx_ctx_impl.begin_request_factory(self._session_state, self._tx_state),
_apis.TableService.Stub,
_apis.TableService.BeginTransaction,
_tx_ctx_impl.wrap_tx_begin_response,
settings,
(self._session_state, self._tx_state, self),
self._session_state.endpoint,
)
[docs]
class SessionPool(object):
def __init__(
self,
driver,
size=100,
workers_threads_count=4,
initializer=None,
min_pool_size=0,
):
"""
An object that encapsulates session creation, deletion and etc. and maintains
a pool of active sessions of specified size
:param driver: A Driver instance
:param size: A maximum number of sessions to maintain in the pool
"""
self._logger = logger.getChild(self.__class__.__name__)
self._pool_impl = _sp_impl.SessionPoolImpl(
self._logger,
driver,
size,
workers_threads_count,
initializer,
min_pool_size,
)
if hasattr(driver, "_driver_config"):
self.tracer = driver._driver_config.tracer
else:
self.tracer = ydb.Tracer(None)
[docs]
def retry_operation_sync(self, callee, retry_settings=None, *args, **kwargs):
retry_settings = RetrySettings() if retry_settings is None else retry_settings
def wrapped_callee():
with self.checkout(timeout=retry_settings.get_session_client_timeout) as session:
return callee(session, *args, **kwargs)
return retry_operation_sync(wrapped_callee, retry_settings)
@property
def active_size(self):
return self._pool_impl.active_size
@property
def free_size(self):
return self._pool_impl.free_size
@property
def busy_size(self):
return self._pool_impl.busy_size
@property
def max_size(self):
return self._pool_impl.max_size
@property
def waiters_count(self):
return self._pool_impl.waiters_count
[docs]
@tracing.with_trace()
def subscribe(self):
return self._pool_impl.subscribe()
[docs]
@tracing.with_trace()
def unsubscribe(self, waiter):
return self._pool_impl.unsubscribe(waiter)
[docs]
@tracing.with_trace()
def acquire(self, blocking=True, timeout=None):
return self._pool_impl.acquire(blocking, timeout)
[docs]
@tracing.with_trace()
def release(self, session):
return self._pool_impl.put(session)
[docs]
def async_checkout(self):
"""
Returns a context manager that asynchronously checkouts a session from the pool.
"""
return AsyncSessionCheckout(self)
[docs]
def checkout(self, blocking=True, timeout=None):
return SessionCheckout(self, blocking, timeout)
[docs]
def stop(self, timeout=None):
self._pool_impl.stop(timeout)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
class AsyncSessionCheckout(object):
__slots__ = ("subscription", "pool")
def __init__(self, pool):
"""
A context manager that asynchronously checkouts a session for the specified pool
and returns it on manager exit.
:param pool: A SessionPool instance.
"""
self.pool = pool
self.subscription = None
def __enter__(self):
self.subscription = self.pool.subscribe()
return self.subscription
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.unsubscribe(self.subscription)
class SessionCheckout(object):
__slots__ = ("_acquired", "_pool", "_blocking", "_timeout")
def __init__(self, pool, blocking, timeout):
"""
A context manager that checkouts a session from the specified pool and
returns it on manager exit.
:param pool: A SessionPool instance
:param blocking: A flag that specifies that session acquire method should blocks
:param timeout: A timeout in seconds for session acquire
"""
self._pool = pool
self._acquired = None
self._blocking = blocking
self._timeout = timeout
def __enter__(self):
self._acquired = self._pool.acquire(self._blocking, self._timeout)
return self._acquired
def __exit__(self, exc_type, exc_val, exc_tb):
if self._acquired is not None:
self._pool.release(self._acquired)