[docs]classQuerySessionPool:"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""def__init__(self,driver:common_utils.SupportedDriverType,size:int=100,*,query_client_settings:Optional[QueryClientSettings]=None,):""" :param driver: A driver instance. :param size: Max size of Session Pool. :param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior """self._driver=driverself._queue=queue.Queue()self._current_size=0self._size=sizeself._should_stop=threading.Event()self._lock=threading.RLock()self._query_client_settings=query_client_settingsdef_create_new_session(self,timeout:Optional[float]):session=QuerySession(self._driver,settings=self._query_client_settings)session.create(settings=BaseRequestSettings().with_timeout(timeout))logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")returnsession
[docs]defacquire(self,timeout:Optional[float]=None)->QuerySession:"""Acquire a session from Session Pool. :param timeout: A timeout to wait in seconds. :return A QuerySession object. """start=time.monotonic()lock_acquire_timeout=timeoutiftimeoutisnotNoneelse-1acquired=self._lock.acquire(timeout=lock_acquire_timeout)try:ifself._should_stop.is_set():logger.error("An attempt to take session from closed session pool.")raiseRuntimeError("An attempt to take session from closed session pool.")session=Nonetry:session=self._queue.get_nowait()exceptqueue.Empty:passfinish=time.monotonic()timeout=timeout-(finish-start)iftimeoutisnotNoneelseNonestart=time.monotonic()ifsessionisNoneandself._current_size==self._size:try:session=self._queue.get(block=True,timeout=timeout)exceptqueue.Empty:raiseissues.SessionPoolEmpty("Timeout on acquire session")ifsessionisnotNone:ifsession._state.attached:logger.debug(f"Acquired active session from queue: {session._state.session_id}")returnsessionelse:self._current_size-=1logger.debug(f"Acquired dead session from queue: {session._state.session_id}")logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")finish=time.monotonic()time_left=timeout-(finish-start)iftimeoutisnotNoneelseNonesession=self._create_new_session(time_left)self._current_size+=1returnsessionfinally:ifacquired:self._lock.release()
[docs]defrelease(self,session:QuerySession)->None:"""Release a session back to Session Pool."""self._queue.put_nowait(session)logger.debug("Session returned to queue: %s",session._state.session_id)
[docs]defcheckout(self,timeout:Optional[float]=None)->"SimpleQuerySessionCheckout":"""Return a Session context manager, that acquires session on enter and releases session on exit. :param timeout: A timeout to wait in seconds. """returnSimpleQuerySessionCheckout(self,timeout)
[docs]defretry_operation_sync(self,callee:Callable,retry_settings:Optional[RetrySettings]=None,*args,**kwargs):"""Special interface to execute a bunch of commands with session in a safe, retriable way. :param callee: A function, that works with session. :param retry_settings: RetrySettings object. :return: Result sets or exception in case of execution errors. """retry_settings=RetrySettings()ifretry_settingsisNoneelseretry_settingsdefwrapped_callee():withself.checkout(timeout=retry_settings.max_session_acquire_timeout)assession:returncallee(session,*args,**kwargs)returnretry_operation_sync(wrapped_callee,retry_settings)
[docs]defretry_tx_sync(self,callee:Callable,tx_mode:Optional[BaseQueryTxMode]=None,retry_settings:Optional[RetrySettings]=None,*args,**kwargs,):"""Special interface to execute a bunch of commands with transaction in a safe, retriable way. :param callee: A function, that works with session. :param tx_mode: Transaction mode, which is a one from the following choises: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); 4) QueryStaleReadOnly(). :param retry_settings: RetrySettings object. :return: Result sets or exception in case of execution errors. """tx_mode=tx_modeiftx_modeelse_ydb_query_public.QuerySerializableReadWrite()retry_settings=RetrySettings()ifretry_settingsisNoneelseretry_settingsdefwrapped_callee():withself.checkout(timeout=retry_settings.max_session_acquire_timeout)assession:withsession.transaction(tx_mode=tx_mode)astx:iftx_mode.namein["serializable_read_write","snapshot_read_only"]:tx.begin()result=callee(tx,*args,**kwargs)tx.commit()returnresultreturnretry_operation_sync(wrapped_callee,retry_settings)
[docs]defexecute_with_retries(self,query:str,parameters:Optional[dict]=None,retry_settings:Optional[RetrySettings]=None,*args,**kwargs,)->List[convert.ResultSet]:"""Special interface to execute a one-shot queries in a safe, retriable way. Note: this method loads all data from stream before return, do not use this method with huge read queries. :param query: A query, yql or sql text. :param parameters: dict with parameters and YDB types; :param retry_settings: RetrySettings object. :return: Result sets or exception in case of execution errors. """retry_settings=RetrySettings()ifretry_settingsisNoneelseretry_settingsdefwrapped_callee():withself.checkout(timeout=retry_settings.max_session_acquire_timeout)assession:it=session.execute(query,parameters,*args,**kwargs)return[result_setforresult_setinit]returnretry_operation_sync(wrapped_callee,retry_settings)
[docs]defstop(self,timeout=None):acquire_timeout=timeoutiftimeoutisnotNoneelse-1acquired=self._lock.acquire(timeout=acquire_timeout)try:self._should_stop.set()whileTrue:try:session=self._queue.get_nowait()session.delete()exceptqueue.Empty:breaklogger.debug("All session were deleted.")finally:ifacquired:self._lock.release()