import logging
from typing import (
Callable,
Optional,
List,
)
import time
import threading
import queue
from .base import QueryClientSettings
from .session import (
QuerySession,
)
from ..retries import (
RetrySettings,
retry_operation_sync,
)
from .. import issues
from .. import convert
from ..settings import BaseRequestSettings
from .._grpc.grpcwrapper import common_utils
logger = logging.getLogger(__name__)
[docs]
class QuerySessionPool:
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
def __init__(
self,
driver: common_utils.SupportedDriverType,
size: int = 100,
*,
query_client_settings: Optional[QueryClientSettings] = None,
):
"""
:param driver: A driver instance.
:param size: Max size of Session Pool.
:param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
"""
self._driver = driver
self._queue = queue.Queue()
self._current_size = 0
self._size = size
self._should_stop = threading.Event()
self._lock = threading.RLock()
self._query_client_settings = query_client_settings
def _create_new_session(self, timeout: Optional[float]):
session = QuerySession(self._driver, settings=self._query_client_settings)
session.create(settings=BaseRequestSettings().with_timeout(timeout))
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
return session
[docs]
def acquire(self, timeout: Optional[float] = None) -> QuerySession:
"""Acquire a session from Session Pool.
:param timeout: A timeout to wait in seconds.
:return A QuerySession object.
"""
start = time.monotonic()
lock_acquire_timeout = timeout if timeout is not None else -1
acquired = self._lock.acquire(timeout=lock_acquire_timeout)
try:
if self._should_stop.is_set():
logger.error("An attempt to take session from closed session pool.")
raise RuntimeError("An attempt to take session from closed session pool.")
session = None
try:
session = self._queue.get_nowait()
except queue.Empty:
pass
finish = time.monotonic()
timeout = timeout - (finish - start) if timeout is not None else None
start = time.monotonic()
if session is None and self._current_size == self._size:
try:
session = self._queue.get(block=True, timeout=timeout)
except queue.Empty:
raise issues.SessionPoolEmpty("Timeout on acquire session")
if session is not None:
if session._state.attached:
logger.debug(f"Acquired active session from queue: {session._state.session_id}")
return session
else:
self._current_size -= 1
logger.debug(f"Acquired dead session from queue: {session._state.session_id}")
logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")
finish = time.monotonic()
time_left = timeout - (finish - start) if timeout is not None else None
session = self._create_new_session(time_left)
self._current_size += 1
return session
finally:
if acquired:
self._lock.release()
[docs]
def release(self, session: QuerySession) -> None:
"""Release a session back to Session Pool."""
self._queue.put_nowait(session)
logger.debug("Session returned to queue: %s", session._state.session_id)
[docs]
def checkout(self, timeout: Optional[float] = None) -> "SimpleQuerySessionCheckout":
"""Return a Session context manager, that acquires session on enter and releases session on exit.
:param timeout: A timeout to wait in seconds.
"""
return SimpleQuerySessionCheckout(self, timeout)
[docs]
def retry_operation_sync(self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs):
"""Special interface to execute a bunch of commands with session in a safe, retriable way.
:param callee: A function, that works with session.
:param retry_settings: RetrySettings object.
:return: Result sets or exception in case of execution errors.
"""
retry_settings = RetrySettings() if retry_settings is None else retry_settings
def wrapped_callee():
with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session:
return callee(session, *args, **kwargs)
return retry_operation_sync(wrapped_callee, retry_settings)
[docs]
def execute_with_retries(
self,
query: str,
parameters: Optional[dict] = None,
retry_settings: Optional[RetrySettings] = None,
*args,
**kwargs,
) -> List[convert.ResultSet]:
"""Special interface to execute a one-shot queries in a safe, retriable way.
Note: this method loads all data from stream before return, do not use this
method with huge read queries.
:param query: A query, yql or sql text.
:param parameters: dict with parameters and YDB types;
:param retry_settings: RetrySettings object.
:return: Result sets or exception in case of execution errors.
"""
retry_settings = RetrySettings() if retry_settings is None else retry_settings
def wrapped_callee():
with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session:
it = session.execute(query, parameters, *args, **kwargs)
return [result_set for result_set in it]
return retry_operation_sync(wrapped_callee, retry_settings)
[docs]
def stop(self, timeout=None):
acquire_timeout = timeout if timeout is not None else -1
acquired = self._lock.acquire(timeout=acquire_timeout)
try:
self._should_stop.set()
while True:
try:
session = self._queue.get_nowait()
session.delete()
except queue.Empty:
break
logger.debug("All session were deleted.")
finally:
if acquired:
self._lock.release()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def __del__(self):
self.stop()
class SimpleQuerySessionCheckout:
def __init__(self, pool: QuerySessionPool, timeout: Optional[float]):
self._pool = pool
self._timeout = timeout
self._session = None
def __enter__(self) -> QuerySession:
self._session = self._pool.acquire(self._timeout)
return self._session
def __exit__(self, exc_type, exc_val, exc_tb):
self._pool.release(self._session)