Source code for ydb.retries

import asyncio
import random
import time

from . import issues
from ._errors import check_retriable_error


class BackoffSettings(object):
    def __init__(self, ceiling=6, slot_duration=0.001, uncertain_ratio=0.5):
        self.ceiling = ceiling
        self.slot_duration = slot_duration
        self.uncertain_ratio = uncertain_ratio

    def calc_timeout(self, retry_number):
        slots_count = 1 << min(retry_number, self.ceiling)
        max_duration_ms = slots_count * self.slot_duration * 1000.0
        # duration_ms = random.random() * max_duration_ms * uncertain_ratio) + max_duration_ms * (1 - uncertain_ratio)
        duration_ms = max_duration_ms * (random.random() * self.uncertain_ratio + 1.0 - self.uncertain_ratio)
        return duration_ms / 1000.0


[docs] class RetrySettings(object): def __init__( self, max_retries=10, max_session_acquire_timeout=None, on_ydb_error_callback=None, backoff_ceiling=6, backoff_slot_duration=1, get_session_client_timeout=5, fast_backoff_settings=None, slow_backoff_settings=None, idempotent=False, ): self.max_retries = max_retries self.max_session_acquire_timeout = max_session_acquire_timeout self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback self.fast_backoff = BackoffSettings(10, 0.005) if fast_backoff_settings is None else fast_backoff_settings self.slow_backoff = ( BackoffSettings(backoff_ceiling, backoff_slot_duration) if slow_backoff_settings is None else slow_backoff_settings ) self.retry_not_found = True self.idempotent = idempotent self.retry_internal_error = True self.unknown_error_handler = lambda e: None self.get_session_client_timeout = get_session_client_timeout if max_session_acquire_timeout is not None: self.get_session_client_timeout = min(self.max_session_acquire_timeout, self.get_session_client_timeout)
[docs] def with_fast_backoff(self, backoff_settings): self.fast_backoff = backoff_settings return self
[docs] def with_slow_backoff(self, backoff_settings): self.slow_backoff = backoff_settings return self
class YdbRetryOperationSleepOpt(object): def __init__(self, timeout): self.timeout = timeout def __eq__(self, other): return type(self) == type(other) and self.timeout == other.timeout def __repr__(self): return "YdbRetryOperationSleepOpt(%s)" % self.timeout class YdbRetryOperationFinalResult(object): def __init__(self, result): self.result = result self.exc = None def __eq__(self, other): return type(self) == type(other) and self.result == other.result and self.exc == other.exc def __repr__(self): return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc) def set_exception(self, exc): self.exc = exc def retry_operation_impl(callee, retry_settings=None, *args, **kwargs): retry_settings = RetrySettings() if retry_settings is None else retry_settings status = None for attempt in range(retry_settings.max_retries + 1): try: result = YdbRetryOperationFinalResult(callee(*args, **kwargs)) yield result if result.exc is not None: raise result.exc except issues.Error as e: status = e retry_settings.on_ydb_error_callback(e) retriable_info = check_retriable_error(e, retry_settings, attempt) if not retriable_info.is_retriable: raise skip_yield_error_types = [ issues.Aborted, issues.BadSession, issues.NotFound, issues.InternalError, ] yield_sleep = True for t in skip_yield_error_types: if isinstance(e, t): yield_sleep = False if yield_sleep: yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds) except Exception as e: # you should provide your own handler you want retry_settings.unknown_error_handler(e) raise raise status def retry_operation_sync(callee, retry_settings=None, *args, **kwargs): opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs) for next_opt in opt_generator: if isinstance(next_opt, YdbRetryOperationSleepOpt): time.sleep(next_opt.timeout) else: return next_opt.result async def retry_operation_async(callee, retry_settings=None, *args, **kwargs): # pylint: disable=W1113 """ The retry operation helper can be used to retry a coroutine that raises YDB specific exceptions. :param callee: A coroutine to retry. :param retry_settings: An instance of ydb.RetrySettings that describes how the coroutine should be retried. If None, default instance of retry settings will be used. :param args: A tuple with positional arguments to be passed into the coroutine. :param kwargs: A dictionary with keyword arguments to be passed into the coroutine. Returns awaitable result of coroutine. If retries are not succussful exception is raised. """ opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs) for next_opt in opt_generator: if isinstance(next_opt, YdbRetryOperationSleepOpt): await asyncio.sleep(next_opt.timeout) else: try: return await next_opt.result except BaseException as e: # pylint: disable=W0703 next_opt.set_exception(e)