Source code for ydb.scheme

# -*- coding: utf-8 -*-
import abc
import enum
from abc import abstractmethod
from typing import Generic, TYPE_CHECKING

from . import issues, operation, settings as settings_impl, _apis
from ._typing import DriverT

if TYPE_CHECKING:
    from .driver import Driver as SyncDriver  # noqa: F401


[docs] @enum.unique class SchemeEntryType(enum.IntEnum): """ Enumerates all available entry types. """ TYPE_UNSPECIFIED = 0 DIRECTORY = 1 TABLE = 2 PERS_QUEUE_GROUP = 3 DATABASE = 4 RTMR_VOLUME = 5 BLOCK_STORE_VOLUME = 6 COORDINATION_NODE = 7 COLUMN_STORE = 12 COLUMN_TABLE = 13 SEQUENCE = 15 REPLICATION = 16 TOPIC = 17 EXTERNAL_TABLE = 18 EXTERNAL_DATA_SOURCE = 19 VIEW = 20 RESOURCE_POOL = 21 TRANSFER = 23 SYS_VIEW = 24 @classmethod def _missing_(cls, value): return cls.TYPE_UNSPECIFIED
[docs] @staticmethod def is_table(entry): """ Deprecated, use is_row_table instead of this. :param entry: A scheme entry to check :return: True if scheme entry is a row table and False otherwise (same as is_row_table) """ return entry == SchemeEntryType.TABLE
[docs] @staticmethod def is_any_table(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is table (independent of table type) and False otherwise """ return entry in (SchemeEntryType.TABLE, SchemeEntryType.COLUMN_TABLE)
[docs] @staticmethod def is_column_table(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a column table and False otherwise """ return entry == SchemeEntryType.COLUMN_TABLE
[docs] @staticmethod def is_column_store(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a column store and False otherwise """ return entry == SchemeEntryType.COLUMN_STORE
[docs] @staticmethod def is_row_table(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a row table and False otherwise (same as is_table) """ return entry == SchemeEntryType.TABLE
[docs] @staticmethod def is_directory(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a directory and False otherwise """ return entry == SchemeEntryType.DIRECTORY
[docs] @staticmethod def is_database(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a database and False otherwise """ return entry == SchemeEntryType.DATABASE
[docs] @staticmethod def is_coordination_node(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a coordination node and False otherwise """ return entry == SchemeEntryType.COORDINATION_NODE
[docs] @staticmethod def is_directory_or_database(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a directory or database and False otherwise """ return entry == SchemeEntryType.DATABASE or entry == SchemeEntryType.DIRECTORY
[docs] @staticmethod def is_external_table(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is an external table and False otherwise """ return entry == SchemeEntryType.EXTERNAL_TABLE
[docs] @staticmethod def is_external_data_source(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is an external data source and False otherwise """ return entry == SchemeEntryType.EXTERNAL_DATA_SOURCE
[docs] @staticmethod def is_view(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a view and False otherwise """ return entry == SchemeEntryType.VIEW
[docs] @staticmethod def is_resource_pool(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a resource pool and False otherwise """ return entry == SchemeEntryType.RESOURCE_POOL
[docs] @staticmethod def is_topic(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a topic and False otherwise """ return entry == SchemeEntryType.TOPIC
[docs] @staticmethod def is_sysview(entry): """ :param entry: A scheme entry to check :return: True if scheme entry is a system view and False otherwise """ return entry == SchemeEntryType.SYS_VIEW
[docs] class SchemeEntry(object): __slots__ = ( "name", "owner", "type", "effective_permissions", "permissions", "size_bytes", ) def __init__(self, name, owner, type, effective_permissions, permissions, size_bytes, *args, **kwargs): """ Represents a scheme entry. :param name: A name of a scheme entry :param owner: A owner of a scheme entry :param type: A type of scheme entry :param effective_permissions: A list of effective permissions applied to this scheme entry :param permissions: A list of permissions applied to this scheme entry :param size_bytes: Size of entry in bytes """ self.name = name self.owner = owner self.type = type self.effective_permissions = effective_permissions self.permissions = permissions self.size_bytes = size_bytes
[docs] def is_directory(self): """ :return: True if scheme entry is a directory and False otherwise """ return SchemeEntryType.is_directory(self.type)
[docs] def is_column_store(self): """ :return: True if scheme entry is a column store and False otherwise """ return SchemeEntryType.is_column_store(self.type)
[docs] def is_table(self): """ :return: True if scheme entry is a row table and False otherwise (same as is_row_table) """ return SchemeEntryType.is_table(self.type)
[docs] def is_column_table(self): """ :return: True if scheme entry is a column table and False otherwise (same as is_row_table) """ return SchemeEntryType.is_column_table(self.type)
[docs] def is_row_table(self): """ :return: True if scheme entry is a row table and False otherwise (same as is_table) """ return SchemeEntryType.is_table(self.type)
[docs] def is_any_table(self): """ :return: True if scheme entry is table (independent of table type) and False otherwise """ return SchemeEntryType.is_any_table(self.type)
[docs] def is_database(self): """ :return: True if scheme entry is a database and False otherwise """ return SchemeEntryType.is_database(self.type)
[docs] def is_directory_or_database(self): """ :return: True if scheme entry is a directory or a database and False otherwise """ return SchemeEntryType.is_directory_or_database(self.type)
[docs] def is_coordination_node(self): """ :return: True if scheme entry is a coordination node and False otherwise """ return SchemeEntryType.is_coordination_node(self.type)
[docs] def is_external_table(self): """ :return: True if scheme entry is an external table and False otherwise """ return SchemeEntryType.is_external_table(self.type)
[docs] def is_external_data_source(self): """ :return: True if scheme entry is an external data source and False otherwise """ return SchemeEntryType.is_external_data_source(self.type)
[docs] def is_view(self): """ :return: True if scheme entry is a view and False otherwise """ return SchemeEntryType.is_view(self.type)
[docs] def is_resource_pool(self): """ :return: True if scheme entry is a resource pool and False otherwise """ return SchemeEntryType.is_resource_pool(self.type)
[docs] def is_sysview(self): """ :return: True if scheme entry is a system view and False otherwise """ return SchemeEntryType.is_sysview(self.type)
class Directory(SchemeEntry): __slots__ = ("children",) def __init__(self, name, owner, type, effective_permissions, permissions, children, *args, **kwargs): """ Represents a directory scheme entry. :param name: A name of a scheme entry :param owner: A owner of a scheme entry :param type: A type of scheme entry :param effective_permissions: A list of effective permissions applied to this scheme entry :param permissions: A list of permissions applied to this scheme entry :param children: A list of children """ super(Directory, self).__init__(name, owner, type, effective_permissions, permissions, 0) self.children = children def _describe_path_request_factory(path): request = _apis.ydb_scheme.DescribePathRequest() request.path = path return request def _list_directory_request_factory(path): request = _apis.ydb_scheme.ListDirectoryRequest() request.path = path return request def _remove_directory_request_factory(path): request = _apis.ydb_scheme.RemoveDirectoryRequest() request.path = path return request def _make_directory_request_factory(path): request = _apis.ydb_scheme.MakeDirectoryRequest() request.path = path return request class MakeDirectorySettings(settings_impl.BaseRequestSettings): pass class RemoveDirectorySettings(settings_impl.BaseRequestSettings): pass class ListDirectorySettings(settings_impl.BaseRequestSettings): pass class DescribePathSettings(settings_impl.BaseRequestSettings): pass class ModifyPermissionsSettings(settings_impl.BaseRequestSettings): def __init__(self): super(ModifyPermissionsSettings, self).__init__() self._pb = _apis.ydb_scheme.ModifyPermissionsRequest() def grant_permissions(self, subject, permission_names): permission_action = self._pb.actions.add() permission_action.grant.MergeFrom(Permissions(subject, permission_names).to_pb()) return self def revoke_permissions(self, subject, permission_names): permission_action = self._pb.actions.add() permission_action.revoke.MergeFrom(Permissions(subject, permission_names).to_pb()) return self def set_permissions(self, subject, permission_names): permission_action = self._pb.actions.add() permission_action.set.MergeFrom(Permissions(subject, permission_names).to_pb()) return self def change_owner(self, owner): permission_action = self._pb.actions.add() permission_action.change_owner = owner return self def clear_permissions(self): self._pb.clear_permissions = True return self def to_pb(self): return self._pb class Permissions(object): __slots__ = ("subject", "permission_names") def __init__(self, subject, permission_names): """ Represents permissions :param subject: A subject of permission names :param permission_names: A list of permission names """ self.subject = subject self.permission_names = permission_names def to_pb(self): """ :return: A protocol buffer representation of permissions """ pb = _apis.ydb_scheme.Permissions() pb.subject = self.subject pb.permission_names.extend(self.permission_names) return pb def _modify_permissions_request_factory(path, settings): """ Constructs modify permissions request :param path: A path to apply permissions :param settings: An instance of ModifyPermissionsSettings :return: A constructed request """ modify_permissions_request = settings.to_pb() modify_permissions_request.path = path return modify_permissions_request def _wrap_permissions(permissions): """ Wraps permissions protocol buffers into native Python objects :param permissions: A protocol buffer representation of permissions :return: A iterable of permissions """ return tuple(Permissions(permission.subject, permission.permission_names) for permission in permissions) def _wrap_scheme_entry(entry_pb, scheme_entry_cls=None, *args, **kwargs): """ Wraps scheme entry into native Python objects. :param entry_pb: A protocol buffer representation of a scheme entry :param scheme_entry_cls: A native Python class that represents scheme entry ( by default that is generic SchemeEntry) :param args: A list of optional arguments :param kwargs: A dictionary of with optional arguments :return: A native Python reprensentation of scheme entry """ scheme_entry_cls = SchemeEntry if scheme_entry_cls is None else scheme_entry_cls return scheme_entry_cls( entry_pb.name, entry_pb.owner, SchemeEntryType(entry_pb.type), _wrap_permissions(entry_pb.effective_permissions), _wrap_permissions(entry_pb.permissions), entry_pb.size_bytes, *args, **kwargs ) def _wrap_list_directory_response(rpc_state, response): """ Wraps list directory response :param response: A list directory response :return: A directory """ issues._process_response(response.operation) message = _apis.ydb_scheme.ListDirectoryResult() response.operation.result.Unpack(message) children = [] supported_items = set(i.value for i in SchemeEntryType) for children_item in message.children: if children_item.type not in supported_items: continue children.append(_wrap_scheme_entry(children_item)) return Directory( message.self.name, message.self.owner, SchemeEntryType(message.self.type), _wrap_permissions(message.self.effective_permissions), _wrap_permissions(message.self.permissions), tuple(children), ) def _wrap_describe_path_response(rpc_state, response): issues._process_response(response.operation) message = _apis.ydb_scheme.DescribePathResult() response.operation.result.Unpack(message) return _wrap_scheme_entry(message.self) class ISchemeClient(abc.ABC): @abstractmethod def __init__(self, driver): pass @abstractmethod def make_directory(self, path, settings): pass @abstractmethod def remove_directory(self, path, settings): pass @abstractmethod def list_directory(self, path, settings): pass @abstractmethod def describe_path(self, path, settings): pass @abstractmethod def modify_permissions(self, path, settings): """ Modifies permissions for provided scheme entry :param path: A path of scheme entry :param settings: An instance of ModifyPermissionsSettings :return: An operation if success or exception on case of failure """ pass class BaseSchemeClient(ISchemeClient, Generic[DriverT]): __slots__ = ("_driver",) _driver: DriverT def __init__(self, driver: DriverT) -> None: self._driver = driver def make_directory(self, path, settings=None): return self._driver( _make_directory_request_factory(path), _apis.SchemeService.Stub, _apis.SchemeService.MakeDirectory, operation.Operation, settings, ) def remove_directory(self, path, settings=None): return self._driver( _remove_directory_request_factory(path), _apis.SchemeService.Stub, _apis.SchemeService.RemoveDirectory, operation.Operation, settings, ) def list_directory(self, path, settings=None): return self._driver( _list_directory_request_factory(path), _apis.SchemeService.Stub, _apis.SchemeService.ListDirectory, _wrap_list_directory_response, settings, ) def describe_path(self, path, settings=None): return self._driver( _describe_path_request_factory(path), _apis.SchemeService.Stub, _apis.SchemeService.DescribePath, _wrap_describe_path_response, settings, ) def modify_permissions(self, path, settings): """ Modifies permissions for provided scheme entry :param path: A path of scheme entry :param settings: An instance of ModifyPermissionsSettings :return: An operation if success or exception on case of failure """ return self._driver( _modify_permissions_request_factory(path, settings), _apis.SchemeService.Stub, _apis.SchemeService.ModifyPermissions, operation.Operation, settings, )
[docs] class SchemeClient(BaseSchemeClient["SyncDriver"]):
[docs] def async_make_directory(self, path, settings=None): return self._driver.future( _make_directory_request_factory(path), _apis.SchemeService.Stub, _apis.SchemeService.MakeDirectory, operation.Operation, settings, )
[docs] def async_remove_directory(self, path, settings=None): return self._driver.future( _remove_directory_request_factory(path), _apis.SchemeService.Stub, _apis.SchemeService.RemoveDirectory, operation.Operation, settings, )
[docs] def async_list_directory(self, path, settings=None): return self._driver.future( _list_directory_request_factory(path), _apis.SchemeService.Stub, _apis.SchemeService.ListDirectory, _wrap_list_directory_response, settings, )
[docs] def async_describe_path(self, path, settings=None): return self._driver.future( _describe_path_request_factory(path), _apis.SchemeService.Stub, _apis.SchemeService.DescribePath, _wrap_describe_path_response, settings, )
[docs] def async_modify_permissions(self, path, settings): """ Modifies permissions for provided scheme entry :param path: A path of scheme entry :param settings: An instance of ModifyPermissionsSettings :return: An future of computation """ return self._driver.future( _modify_permissions_request_factory(path, settings), _apis.SchemeService.Stub, _apis.SchemeService.ModifyPermissions, operation.Operation, settings, )