Source code for ydb.query.session

import abc
import enum
import logging
import threading
from typing import (
    Iterable,
    Optional,
)

from . import base

from .. import _apis, issues, _utilities
from ..settings import BaseRequestSettings
from ..connection import _RpcState as RpcState
from .._grpc.grpcwrapper import common_utils
from .._grpc.grpcwrapper import ydb_query as _ydb_query
from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public

from .transaction import QueryTxContext


logger = logging.getLogger(__name__)


DEFAULT_ATTACH_FIRST_RESP_TIMEOUT = 600
DEFAULT_ATTACH_LONG_TIMEOUT = 31536000  # year


class QuerySessionStateEnum(enum.Enum):
    NOT_INITIALIZED = "NOT_INITIALIZED"
    CREATED = "CREATED"
    CLOSED = "CLOSED"


class QuerySessionStateHelper(abc.ABC):
    _VALID_TRANSITIONS = {
        QuerySessionStateEnum.NOT_INITIALIZED: [QuerySessionStateEnum.CREATED],
        QuerySessionStateEnum.CREATED: [QuerySessionStateEnum.CLOSED],
        QuerySessionStateEnum.CLOSED: [],
    }

    _READY_TO_USE = [
        QuerySessionStateEnum.CREATED,
    ]

    @classmethod
    def valid_transition(cls, before: QuerySessionStateEnum, after: QuerySessionStateEnum) -> bool:
        return after in cls._VALID_TRANSITIONS[before]

    @classmethod
    def ready_to_use(cls, state: QuerySessionStateEnum) -> bool:
        return state in cls._READY_TO_USE


class QuerySessionState(base.IQuerySessionState):
    _session_id: Optional[str] = None
    _node_id: Optional[int] = None
    _attached: bool = False
    _settings: Optional[base.QueryClientSettings] = None
    _state: QuerySessionStateEnum = QuerySessionStateEnum.NOT_INITIALIZED

    def __init__(self, settings: base.QueryClientSettings = None):
        self._settings = settings

    def reset(self) -> None:
        self._session_id = None
        self._node_id = None
        self._attached = False

    @property
    def session_id(self) -> Optional[str]:
        return self._session_id

    def set_session_id(self, session_id: str) -> "QuerySessionState":
        self._session_id = session_id
        return self

    @property
    def node_id(self) -> Optional[int]:
        return self._node_id

    def set_node_id(self, node_id: int) -> "QuerySessionState":
        self._node_id = node_id
        return self

    @property
    def attached(self) -> bool:
        return self._attached

    def set_attached(self, attached: bool) -> "QuerySessionState":
        self._attached = attached

    def _check_invalid_transition(self, target: QuerySessionStateEnum) -> None:
        if not QuerySessionStateHelper.valid_transition(self._state, target):
            raise RuntimeError(f"Session could not be moved from {self._state.value} to {target.value}")

    def _change_state(self, target: QuerySessionStateEnum) -> None:
        self._check_invalid_transition(target)
        self._state = target

    def _check_session_ready_to_use(self) -> None:
        if not QuerySessionStateHelper.ready_to_use(self._state):
            raise RuntimeError(f"Session is not ready to use, current state: {self._state.value}")

    def _already_in(self, target) -> bool:
        return self._state == target


def wrapper_create_session(
    rpc_state: RpcState,
    response_pb: _apis.ydb_query.CreateSessionResponse,
    session_state: QuerySessionState,
    session: "BaseQuerySession",
) -> "BaseQuerySession":
    message = _ydb_query.CreateSessionResponse.from_proto(response_pb)
    issues._process_response(message.status)
    session_state.set_session_id(message.session_id).set_node_id(message.node_id)
    return session


def wrapper_delete_session(
    rpc_state: RpcState,
    response_pb: _apis.ydb_query.DeleteSessionResponse,
    session_state: QuerySessionState,
    session: "BaseQuerySession",
) -> "BaseQuerySession":
    message = _ydb_query.DeleteSessionResponse.from_proto(response_pb)
    issues._process_response(message.status)
    session_state.reset()
    session_state._change_state(QuerySessionStateEnum.CLOSED)
    return session


class BaseQuerySession:
    _driver: common_utils.SupportedDriverType
    _settings: base.QueryClientSettings
    _state: QuerySessionState

    def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None):
        self._driver = driver
        self._settings = self._get_client_settings(driver, settings)
        self._state = QuerySessionState(settings)
        self._attach_settings: BaseRequestSettings = (
            BaseRequestSettings()
            .with_operation_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
            .with_cancel_after(DEFAULT_ATTACH_LONG_TIMEOUT)
            .with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
        )

    def _get_client_settings(
        self,
        driver: common_utils.SupportedDriverType,
        settings: Optional[base.QueryClientSettings] = None,
    ) -> base.QueryClientSettings:
        if settings is not None:
            return settings
        if driver._driver_config.query_client_settings is not None:
            return driver._driver_config.query_client_settings
        return base.QueryClientSettings()

    def _create_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
        return self._driver(
            _apis.ydb_query.CreateSessionRequest(),
            _apis.QueryService.Stub,
            _apis.QueryService.CreateSession,
            wrap_result=wrapper_create_session,
            wrap_args=(self._state, self),
            settings=settings,
        )

    def _delete_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
        return self._driver(
            _apis.ydb_query.DeleteSessionRequest(session_id=self._state.session_id),
            _apis.QueryService.Stub,
            _apis.QueryService.DeleteSession,
            wrap_result=wrapper_delete_session,
            wrap_args=(self._state, self),
            settings=settings,
        )

    def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]:
        return self._driver(
            _apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id),
            _apis.QueryService.Stub,
            _apis.QueryService.AttachSession,
            settings=self._attach_settings,
        )

    def _execute_call(
        self,
        query: str,
        commit_tx: bool = False,
        syntax: base.QuerySyntax = None,
        exec_mode: base.QueryExecMode = None,
        parameters: dict = None,
        concurrent_result_sets: bool = False,
        settings: Optional[BaseRequestSettings] = None,
    ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
        request = base.create_execute_query_request(
            query=query,
            session_id=self._state.session_id,
            commit_tx=commit_tx,
            tx_mode=None,
            tx_id=None,
            syntax=syntax,
            exec_mode=exec_mode,
            parameters=parameters,
            concurrent_result_sets=concurrent_result_sets,
        )

        return self._driver(
            request.to_proto(),
            _apis.QueryService.Stub,
            _apis.QueryService.ExecuteQuery,
            settings=settings,
        )


[docs] class QuerySession(BaseQuerySession): """Session object for Query Service. It is not recommended to control session's lifecycle manually - use a QuerySessionPool is always a better choise. """ _stream = None def _attach(self, first_resp_timeout: int = DEFAULT_ATTACH_FIRST_RESP_TIMEOUT) -> None: self._stream = self._attach_call() status_stream = _utilities.SyncResponseIterator( self._stream, lambda response: common_utils.ServerStatus.from_proto(response), ) try: first_response = _utilities.get_first_message_with_timeout( status_stream, first_resp_timeout, ) if first_response.status != issues.StatusCode.SUCCESS: raise RuntimeError("Failed to attach session") except Exception as e: self._state.reset() status_stream.cancel() raise e self._state.set_attached(True) self._state._change_state(QuerySessionStateEnum.CREATED) threading.Thread( target=self._check_session_status_loop, args=(status_stream,), name="attach stream thread", daemon=True, ).start() def _check_session_status_loop(self, status_stream: _utilities.SyncResponseIterator) -> None: try: for status in status_stream: if status.status != issues.StatusCode.SUCCESS: self._state.reset() self._state._change_state(QuerySessionStateEnum.CLOSED) except Exception: if not self._state._already_in(QuerySessionStateEnum.CLOSED): self._state.reset() self._state._change_state(QuerySessionStateEnum.CLOSED)
[docs] def delete(self, settings: Optional[BaseRequestSettings] = None) -> None: """Deletes a Session of Query Service on server side and releases resources. :return: None """ if self._state._already_in(QuerySessionStateEnum.CLOSED): return self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED) self._delete_call(settings=settings) self._stream.cancel()
[docs] def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySession": """Creates a Session of Query Service on server side and attaches it. :return: QuerySession object. """ if self._state._already_in(QuerySessionStateEnum.CREATED): return self._state._check_invalid_transition(QuerySessionStateEnum.CREATED) self._create_call(settings=settings) self._attach() return self
[docs] def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContext: """Creates a transaction context manager with specified transaction mode. :param tx_mode: Transaction mode, which is a one from the following choises: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); 4) QueryStaleReadOnly(). :return transaction context manager. """ self._state._check_session_ready_to_use() tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() return QueryTxContext( self._driver, self._state, self, tx_mode, )
[docs] def execute( self, query: str, parameters: dict = None, syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, ) -> base.SyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. :param syntax: Syntax of the query, which is a one from the following choises: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; :return: Iterator with result sets """ self._state._check_session_ready_to_use() stream_it = self._execute_call( query=query, commit_tx=True, syntax=syntax, exec_mode=exec_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, settings=settings, ) return base.SyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, session_state=self._state, settings=self._settings, ), )