# -*- coding: utf-8 -*-
from __future__ import annotations
from google.protobuf import text_format
import enum
import queue
import typing
from typing import ClassVar, Optional, Iterable, Any, Union, Protocol, runtime_checkable
from . import _apis
# Workaround for good IDE and universal for runtime
if typing.TYPE_CHECKING:
from ._grpc.v4.protos import ydb_issue_message_pb2
else:
from ._grpc.common.protos import ydb_issue_message_pb2
@runtime_checkable
class _StatusResponseProtocol(Protocol):
"""Protocol for objects that have status and issues attributes."""
@property
def status(self) -> Union[StatusCode, int]:
...
@property
def issues(self) -> Iterable[Any]:
...
_TRANSPORT_STATUSES_FIRST = 401000
_CLIENT_STATUSES_FIRST = 402000
@enum.unique
class StatusCode(enum.IntEnum):
STATUS_CODE_UNSPECIFIED = _apis.StatusIds.STATUS_CODE_UNSPECIFIED
SUCCESS = _apis.StatusIds.SUCCESS
BAD_REQUEST = _apis.StatusIds.BAD_REQUEST
UNAUTHORIZED = _apis.StatusIds.UNAUTHORIZED
INTERNAL_ERROR = _apis.StatusIds.INTERNAL_ERROR
ABORTED = _apis.StatusIds.ABORTED
UNAVAILABLE = _apis.StatusIds.UNAVAILABLE
OVERLOADED = _apis.StatusIds.OVERLOADED
SCHEME_ERROR = _apis.StatusIds.SCHEME_ERROR
GENERIC_ERROR = _apis.StatusIds.GENERIC_ERROR
TIMEOUT = _apis.StatusIds.TIMEOUT
BAD_SESSION = _apis.StatusIds.BAD_SESSION
PRECONDITION_FAILED = _apis.StatusIds.PRECONDITION_FAILED
ALREADY_EXISTS = _apis.StatusIds.ALREADY_EXISTS
NOT_FOUND = _apis.StatusIds.NOT_FOUND
SESSION_EXPIRED = _apis.StatusIds.SESSION_EXPIRED
CANCELLED = _apis.StatusIds.CANCELLED
UNDETERMINED = _apis.StatusIds.UNDETERMINED
UNSUPPORTED = _apis.StatusIds.UNSUPPORTED
SESSION_BUSY = _apis.StatusIds.SESSION_BUSY
EXTERNAL_ERROR = _apis.StatusIds.EXTERNAL_ERROR
CONNECTION_LOST = _TRANSPORT_STATUSES_FIRST + 10
CONNECTION_FAILURE = _TRANSPORT_STATUSES_FIRST + 20
DEADLINE_EXCEEDED = _TRANSPORT_STATUSES_FIRST + 30
CLIENT_INTERNAL_ERROR = _TRANSPORT_STATUSES_FIRST + 40
UNIMPLEMENTED = _TRANSPORT_STATUSES_FIRST + 50
UNAUTHENTICATED = _CLIENT_STATUSES_FIRST + 30
SESSION_POOL_EMPTY = _CLIENT_STATUSES_FIRST + 40
SESSION_POOL_CLOSED = _CLIENT_STATUSES_FIRST + 50
# TODO: convert from proto IssueMessage
class _IssueMessage:
def __init__(self, message: str, issue_code: int, severity: int, issues) -> None:
self.message = message
self.issue_code = issue_code
self.severity = severity
self.issues = issues
[docs]
class Error(Exception):
status: ClassVar[Optional[StatusCode]] = None
def __init__(self, message: str, issues: typing.Optional[typing.Iterable[_IssueMessage]] = None):
super(Error, self).__init__(message)
self.issues = issues
self.message = message
class TruncatedResponseError(Error):
status: ClassVar[Optional[StatusCode]] = None
[docs]
class ConnectionError(Error):
status: ClassVar[Optional[StatusCode]] = None
[docs]
class ConnectionFailure(ConnectionError):
status: ClassVar[Optional[StatusCode]] = StatusCode.CONNECTION_FAILURE
[docs]
class ConnectionLost(ConnectionError):
status: ClassVar[Optional[StatusCode]] = StatusCode.CONNECTION_LOST
[docs]
class DeadlineExceed(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.DEADLINE_EXCEEDED
class Unimplemented(ConnectionError):
status: ClassVar[Optional[StatusCode]] = StatusCode.UNIMPLEMENTED
[docs]
class Unauthenticated(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.UNAUTHENTICATED
[docs]
class BadRequest(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.BAD_REQUEST
[docs]
class Unauthorized(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.UNAUTHORIZED
class InternalError(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.INTERNAL_ERROR
[docs]
class Aborted(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.ABORTED
[docs]
class Unavailable(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.UNAVAILABLE
[docs]
class Overloaded(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.OVERLOADED
[docs]
class SchemeError(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.SCHEME_ERROR
[docs]
class GenericError(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.GENERIC_ERROR
[docs]
class BadSession(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.BAD_SESSION
[docs]
class Timeout(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.TIMEOUT
class PreconditionFailed(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.PRECONDITION_FAILED
[docs]
class NotFound(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.NOT_FOUND
[docs]
class AlreadyExists(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.ALREADY_EXISTS
class SessionExpired(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.SESSION_EXPIRED
class Cancelled(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.CANCELLED
[docs]
class Undetermined(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.UNDETERMINED
class Unsupported(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.UNSUPPORTED
class SessionBusy(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.SESSION_BUSY
class ExternalError(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.EXTERNAL_ERROR
[docs]
class SessionPoolEmpty(Error, queue.Empty):
status: ClassVar[Optional[StatusCode]] = StatusCode.SESSION_POOL_EMPTY
class SessionPoolClosed(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.SESSION_POOL_CLOSED
def __init__(self):
super().__init__("Session pool is closed.")
class ClientInternalError(Error):
status: ClassVar[Optional[StatusCode]] = StatusCode.CLIENT_INTERNAL_ERROR
class UnexpectedGrpcMessage(Error):
def __init__(self, message: str):
super().__init__(message)
def _format_issues(issues: typing.Iterable[ydb_issue_message_pb2.IssueMessage]) -> str:
if not issues:
return ""
return " ,".join(text_format.MessageToString(issue, as_utf8=False, as_one_line=True) for issue in issues)
def _format_response(response: _StatusResponseProtocol) -> str:
fmt_issues = _format_issues(response.issues)
return f"{fmt_issues} (server_code: {response.status})"
_success_status_codes = {StatusCode.STATUS_CODE_UNSPECIFIED, StatusCode.SUCCESS}
_server_side_error_map = {
StatusCode.BAD_REQUEST: BadRequest,
StatusCode.UNAUTHORIZED: Unauthorized,
StatusCode.INTERNAL_ERROR: InternalError,
StatusCode.ABORTED: Aborted,
StatusCode.UNAVAILABLE: Unavailable,
StatusCode.OVERLOADED: Overloaded,
StatusCode.SCHEME_ERROR: SchemeError,
StatusCode.GENERIC_ERROR: GenericError,
StatusCode.TIMEOUT: Timeout,
StatusCode.BAD_SESSION: BadSession,
StatusCode.PRECONDITION_FAILED: PreconditionFailed,
StatusCode.ALREADY_EXISTS: AlreadyExists,
StatusCode.NOT_FOUND: NotFound,
StatusCode.SESSION_EXPIRED: SessionExpired,
StatusCode.CANCELLED: Cancelled,
StatusCode.UNDETERMINED: Undetermined,
StatusCode.UNSUPPORTED: Unsupported,
StatusCode.SESSION_BUSY: SessionBusy,
StatusCode.EXTERNAL_ERROR: ExternalError,
}
def _process_response(response_proto: _StatusResponseProtocol) -> None:
"""Process response and raise appropriate exception if status is not success.
:param response_proto: Any object with status and issues attributes
(Operation, ServerStatus, ExecuteQueryResponsePart, etc.)
:raises: Appropriate YDB error based on status code
"""
try:
status = StatusCode(response_proto.status)
except ValueError:
# Unknown status code from server - treat as GenericError
raise GenericError(
"Unknown status code: %s. %s" % (response_proto.status, _format_response(response_proto)),
response_proto.issues,
)
if status not in _success_status_codes:
exc_class = _server_side_error_map.get(status, GenericError)
raise exc_class(_format_response(response_proto), response_proto.issues)