import abc
import enum
import functools
import typing
from typing import (
Optional,
)
from .._grpc.grpcwrapper import ydb_query
from .._grpc.grpcwrapper.ydb_query_public_types import (
BaseQueryTxMode,
)
from ..connection import _RpcState as RpcState
from .. import convert
from .. import issues
from .. import _utilities
from .. import _apis
if typing.TYPE_CHECKING:
from .transaction import BaseQueryTxContext
class QuerySyntax(enum.IntEnum):
UNSPECIFIED = 0
YQL_V1 = 1
PG = 2
class QueryExecMode(enum.IntEnum):
UNSPECIFIED = 0
PARSE = 10
VALIDATE = 20
EXPLAIN = 30
EXECUTE = 50
class StatsMode(enum.IntEnum):
UNSPECIFIED = 0
NONE = 10
BASIC = 20
FULL = 30
PROFILE = 40
class SyncResponseContextIterator(_utilities.SyncResponseIterator):
def __enter__(self) -> "SyncResponseContextIterator":
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end
for _ in self:
pass
[docs]
class QueryClientSettings:
def __init__(self):
self._native_datetime_in_result_sets = True
self._native_date_in_result_sets = True
self._native_json_in_result_sets = True
self._native_interval_in_result_sets = True
self._native_timestamp_in_result_sets = True
[docs]
def with_native_timestamp_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
self._native_timestamp_in_result_sets = enabled
return self
[docs]
def with_native_interval_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
self._native_interval_in_result_sets = enabled
return self
[docs]
def with_native_json_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
self._native_json_in_result_sets = enabled
return self
[docs]
def with_native_date_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
self._native_date_in_result_sets = enabled
return self
[docs]
def with_native_datetime_in_result_sets(self, enabled: bool) -> "QueryClientSettings":
self._native_datetime_in_result_sets = enabled
return self
class IQuerySessionState(abc.ABC):
def __init__(self, settings: Optional[QueryClientSettings] = None):
pass
@abc.abstractmethod
def reset(self) -> None:
pass
@property
@abc.abstractmethod
def session_id(self) -> Optional[str]:
pass
@abc.abstractmethod
def set_session_id(self, session_id: str) -> "IQuerySessionState":
pass
@property
@abc.abstractmethod
def node_id(self) -> Optional[int]:
pass
@abc.abstractmethod
def set_node_id(self, node_id: int) -> "IQuerySessionState":
pass
@property
@abc.abstractmethod
def attached(self) -> bool:
pass
@abc.abstractmethod
def set_attached(self, attached: bool) -> "IQuerySessionState":
pass
def create_execute_query_request(
query: str,
session_id: str,
tx_id: Optional[str],
commit_tx: Optional[bool],
tx_mode: Optional[BaseQueryTxMode],
syntax: Optional[QuerySyntax],
exec_mode: Optional[QueryExecMode],
parameters: Optional[dict],
concurrent_result_sets: Optional[bool],
) -> ydb_query.ExecuteQueryRequest:
syntax = QuerySyntax.YQL_V1 if not syntax else syntax
exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode
stats_mode = StatsMode.NONE # TODO: choise is not supported yet
tx_control = None
if not tx_id and not tx_mode:
tx_control = None
elif tx_id:
tx_control = ydb_query.TransactionControl(
tx_id=tx_id,
commit_tx=commit_tx,
begin_tx=None,
)
else:
tx_control = ydb_query.TransactionControl(
begin_tx=ydb_query.TransactionSettings(
tx_mode=tx_mode,
),
commit_tx=commit_tx,
tx_id=None,
)
return ydb_query.ExecuteQueryRequest(
session_id=session_id,
query_content=ydb_query.QueryContent.from_public(
query=query,
syntax=syntax,
),
tx_control=tx_control,
exec_mode=exec_mode,
parameters=parameters,
concurrent_result_sets=concurrent_result_sets,
stats_mode=stats_mode,
)
def bad_session_handler(func):
@functools.wraps(func)
def decorator(rpc_state, response_pb, session_state: IQuerySessionState, *args, **kwargs):
try:
return func(rpc_state, response_pb, session_state, *args, **kwargs)
except issues.BadSession:
session_state.reset()
raise
return decorator
@bad_session_handler
def wrap_execute_query_response(
rpc_state: RpcState,
response_pb: _apis.ydb_query.ExecuteQueryResponsePart,
session_state: IQuerySessionState,
tx: Optional["BaseQueryTxContext"] = None,
commit_tx: Optional[bool] = False,
settings: Optional[QueryClientSettings] = None,
) -> convert.ResultSet:
issues._process_response(response_pb)
if tx and commit_tx:
tx._move_to_commited()
elif tx and response_pb.tx_meta and not tx.tx_id:
tx._move_to_beginned(response_pb.tx_meta.id)
return convert.ResultSet.from_message(response_pb.result_set, settings)