Source code for ydb.aio.query.session

import asyncio

from typing import (
    Optional,
)

from .base import AsyncResponseContextIterator
from .transaction import QueryTxContext
from .. import _utilities
from ... import issues
from ...settings import BaseRequestSettings
from ..._grpc.grpcwrapper import common_utils
from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public

from ...query import base
from ...query.session import (
    BaseQuerySession,
    DEFAULT_ATTACH_FIRST_RESP_TIMEOUT,
    QuerySessionStateEnum,
)


[docs] class QuerySession(BaseQuerySession): """Session object for Query Service. It is not recommended to control session's lifecycle manually - use a QuerySessionPool is always a better choise. """ _loop: asyncio.AbstractEventLoop _status_stream: _utilities.AsyncResponseIterator = None def __init__( self, driver: common_utils.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None, loop: asyncio.AbstractEventLoop = None, ): super(QuerySession, self).__init__(driver, settings) self._loop = loop if loop is not None else asyncio.get_running_loop() async def _attach(self) -> None: self._stream = await self._attach_call() self._status_stream = _utilities.AsyncResponseIterator( self._stream, lambda response: common_utils.ServerStatus.from_proto(response), ) try: first_response = await _utilities.get_first_message_with_timeout( self._status_stream, DEFAULT_ATTACH_FIRST_RESP_TIMEOUT, ) if first_response.status != issues.StatusCode.SUCCESS: raise RuntimeError("Failed to attach session") except Exception as e: self._state.reset() self._status_stream.cancel() raise e self._state.set_attached(True) self._state._change_state(QuerySessionStateEnum.CREATED) self._loop.create_task(self._check_session_status_loop(), name="check session status task") async def _check_session_status_loop(self) -> None: try: async for status in self._status_stream: if status.status != issues.StatusCode.SUCCESS: self._state.reset() self._state._change_state(QuerySessionStateEnum.CLOSED) except Exception: if not self._state._already_in(QuerySessionStateEnum.CLOSED): self._state.reset() self._state._change_state(QuerySessionStateEnum.CLOSED)
[docs] async def delete(self, settings: Optional[BaseRequestSettings] = None) -> None: """Deletes a Session of Query Service on server side and releases resources. :return: None """ if self._state._already_in(QuerySessionStateEnum.CLOSED): return self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED) await self._delete_call(settings=settings) self._stream.cancel()
[docs] async def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySession": """Creates a Session of Query Service on server side and attaches it. :return: QuerySession object. """ if self._state._already_in(QuerySessionStateEnum.CREATED): return self._state._check_invalid_transition(QuerySessionStateEnum.CREATED) await self._create_call(settings=settings) await self._attach() return self
[docs] def transaction(self, tx_mode=None) -> QueryTxContext: self._state._check_session_ready_to_use() tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() return QueryTxContext( self._driver, self._state, self, tx_mode, )
[docs] async def execute( self, query: str, parameters: dict = None, syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, ) -> AsyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. :param syntax: Syntax of the query, which is a one from the following choises: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; :return: Iterator with result sets """ self._state._check_session_ready_to_use() stream_it = await self._execute_call( query=query, commit_tx=True, syntax=syntax, exec_mode=exec_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, settings=settings, ) return AsyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, session_state=self._state, settings=self._settings, ), )