Source code for ydb.aio.query.transaction

import logging
from typing import (
    Optional,
)

from .base import AsyncResponseContextIterator
from ... import issues
from ...settings import BaseRequestSettings
from ...query import base
from ...query.transaction import (
    BaseQueryTxContext,
    QueryTxStateEnum,
)

logger = logging.getLogger(__name__)


[docs] class QueryTxContext(BaseQueryTxContext): async def __aenter__(self) -> "QueryTxContext": """ Enters a context manager and returns a transaction :return: A transaction instance """ return self async def __aexit__(self, *args, **kwargs): """ Closes a transaction context manager and rollbacks transaction if it is not finished explicitly """ await self._ensure_prev_stream_finished() if self._tx_state._state == QueryTxStateEnum.BEGINED: # It's strictly recommended to close transactions directly # by using commit_tx=True flag while executing statement or by # .commit() or .rollback() methods, but here we trying to do best # effort to avoid useless open transactions logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id) try: await self.rollback() except issues.Error: logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id) async def _ensure_prev_stream_finished(self) -> None: if self._prev_stream is not None: async with self._prev_stream: pass self._prev_stream = None
[docs] async def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContext": """Explicitly begins a transaction :param settings: An additional request settings BaseRequestSettings; :return: None or exception if begin is failed """ await self._begin_call(settings) return self
[docs] async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: """Calls commit on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ if self._tx_state._already_in(QueryTxStateEnum.COMMITTED): return if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: self._tx_state._change_state(QueryTxStateEnum.COMMITTED) return await self._ensure_prev_stream_finished() await self._commit_call(settings)
[docs] async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ if self._tx_state._already_in(QueryTxStateEnum.ROLLBACKED): return if self._tx_state._state == QueryTxStateEnum.NOT_INITIALIZED: self._tx_state._change_state(QueryTxStateEnum.ROLLBACKED) return await self._ensure_prev_stream_finished() await self._rollback_call(settings)
[docs] async def execute( self, query: str, parameters: Optional[dict] = None, commit_tx: Optional[bool] = False, syntax: Optional[base.QuerySyntax] = None, exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, settings: Optional[BaseRequestSettings] = None, ) -> AsyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. :param syntax: Syntax of the query, which is a one from the following choises: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. :param exec_mode: Exec mode of the query, which is a one from the following choises: 1) QueryExecMode.EXECUTE, which is default; 2) QueryExecMode.EXPLAIN; 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; :return: Iterator with result sets """ await self._ensure_prev_stream_finished() stream_it = await self._execute_call( query=query, commit_tx=commit_tx, syntax=syntax, exec_mode=exec_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, settings=settings, ) self._prev_stream = AsyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, session_state=self._session_state, tx=self, commit_tx=commit_tx, settings=self.session._settings, ), ) return self._prev_stream