# -*- coding: utf-8 -*-
import abc
import enum
from abc import abstractmethod
from . import issues, operation, settings as settings_impl, _apis
@enum.unique
class SchemeEntryType(enum.IntEnum):
"""
Enumerates all available entry types.
"""
TYPE_UNSPECIFIED = 0
DIRECTORY = 1
TABLE = 2
PERS_QUEUE_GROUP = 3
DATABASE = 4
RTMR_VOLUME = 5
BLOCK_STORE_VOLUME = 6
COORDINATION_NODE = 7
COLUMN_STORE = 12
COLUMN_TABLE = 13
SEQUENCE = 15
REPLICATION = 16
TOPIC = 17
@classmethod
def _missing_(cls, value):
return cls.TYPE_UNSPECIFIED
@staticmethod
def is_table(entry):
"""
Deprecated, use is_row_table instead of this.
:param entry: A scheme entry to check
:return: True if scheme entry is a row table and False otherwise (same as is_row_table)
"""
return entry == SchemeEntryType.TABLE
@staticmethod
def is_any_table(entry):
"""
:param entry: A scheme entry to check
:return: True if scheme entry is table (independent of table type) and False otherwise
"""
return entry in (SchemeEntryType.TABLE, SchemeEntryType.COLUMN_TABLE)
@staticmethod
def is_column_table(entry):
"""
:param entry: A scheme entry to check
:return: True if scheme entry is a column table and False otherwise
"""
return entry == SchemeEntryType.COLUMN_TABLE
@staticmethod
def is_column_store(entry):
"""
:param entry: A scheme entry to check
:return: True if scheme entry is a column store and False otherwise
"""
return entry == SchemeEntryType.COLUMN_STORE
@staticmethod
def is_row_table(entry):
"""
:param entry: A scheme entry to check
:return: True if scheme entry is a row table and False otherwise (same as is_table)
"""
return entry == SchemeEntryType.TABLE
@staticmethod
def is_directory(entry):
"""
:param entry: A scheme entry to check
:return: True if scheme entry is a directory and False otherwise
"""
return entry == SchemeEntryType.DIRECTORY
@staticmethod
def is_database(entry):
"""
:param entry: A scheme entry to check
:return: True if scheme entry is a database and False otherwise
"""
return entry == SchemeEntryType.DATABASE
@staticmethod
def is_coordination_node(entry):
"""
:param entry: A scheme entry to check
:return: True if scheme entry is a coordination node and False otherwise
"""
return entry == SchemeEntryType.COORDINATION_NODE
@staticmethod
def is_directory_or_database(entry):
"""
:param entry: A scheme entry to check
:return: True if scheme entry is a directory or database and False otherwise
"""
return entry == SchemeEntryType.DATABASE or entry == SchemeEntryType.DIRECTORY
class SchemeEntry(object):
__slots__ = (
"name",
"owner",
"type",
"effective_permissions",
"permissions",
"size_bytes",
)
def __init__(self, name, owner, type, effective_permissions, permissions, size_bytes, *args, **kwargs):
"""
Represents a scheme entry.
:param name: A name of a scheme entry
:param owner: A owner of a scheme entry
:param type: A type of scheme entry
:param effective_permissions: A list of effective permissions applied to this scheme entry
:param permissions: A list of permissions applied to this scheme entry
:param size_bytes: Size of entry in bytes
"""
self.name = name
self.owner = owner
self.type = type
self.effective_permissions = effective_permissions
self.permissions = permissions
self.size_bytes = size_bytes
def is_directory(self):
"""
:return: True if scheme entry is a directory and False otherwise
"""
return SchemeEntryType.is_directory(self.type)
def is_column_store(self):
"""
:return: True if scheme entry is a column store and False otherwise
"""
return SchemeEntryType.is_column_store(self.type)
def is_table(self):
"""
:return: True if scheme entry is a row table and False otherwise (same as is_row_table)
"""
return SchemeEntryType.is_table(self.type)
def is_column_table(self):
"""
:return: True if scheme entry is a column table and False otherwise (same as is_row_table)
"""
return SchemeEntryType.is_column_table(self.type)
def is_row_table(self):
"""
:return: True if scheme entry is a row table and False otherwise (same as is_table)
"""
return SchemeEntryType.is_table(self.type)
def is_any_table(self):
"""
:return: True if scheme entry is table (independent of table type) and False otherwise
"""
return SchemeEntryType.is_any_table(self.type)
def is_database(self):
"""
:return: True if scheme entry is a database and False otherwise
"""
return SchemeEntryType.is_database(self.type)
def is_directory_or_database(self):
"""
:return: True if scheme entry is a directory or a database and False otherwise
"""
return SchemeEntryType.is_directory_or_database(self.type)
def is_coordination_node(self):
"""
:return: True if scheme entry is a coordination node and False otherwise
"""
return SchemeEntryType.is_coordination_node(self.type)
class Directory(SchemeEntry):
__slots__ = ("children",)
def __init__(self, name, owner, type, effective_permissions, permissions, children, *args, **kwargs):
"""
Represents a directory scheme entry.
:param name: A name of a scheme entry
:param owner: A owner of a scheme entry
:param type: A type of scheme entry
:param effective_permissions: A list of effective permissions applied to this scheme entry
:param permissions: A list of permissions applied to this scheme entry
:param children: A list of children
"""
super(Directory, self).__init__(name, owner, type, effective_permissions, permissions, 0)
self.children = children
def _describe_path_request_factory(path):
request = _apis.ydb_scheme.DescribePathRequest()
request.path = path
return request
def _list_directory_request_factory(path):
request = _apis.ydb_scheme.ListDirectoryRequest()
request.path = path
return request
def _remove_directory_request_factory(path):
request = _apis.ydb_scheme.RemoveDirectoryRequest()
request.path = path
return request
def _make_directory_request_factory(path):
request = _apis.ydb_scheme.MakeDirectoryRequest()
request.path = path
return request
class MakeDirectorySettings(settings_impl.BaseRequestSettings):
pass
class RemoveDirectorySettings(settings_impl.BaseRequestSettings):
pass
class ListDirectorySettings(settings_impl.BaseRequestSettings):
pass
class DescribePathSettings(settings_impl.BaseRequestSettings):
pass
class ModifyPermissionsSettings(settings_impl.BaseRequestSettings):
def __init__(self):
super(ModifyPermissionsSettings, self).__init__()
self._pb = _apis.ydb_scheme.ModifyPermissionsRequest()
def grant_permissions(self, subject, permission_names):
permission_action = self._pb.actions.add()
permission_action.grant.MergeFrom(Permissions(subject, permission_names).to_pb())
return self
def revoke_permissions(self, subject, permission_names):
permission_action = self._pb.actions.add()
permission_action.revoke.MergeFrom(Permissions(subject, permission_names).to_pb())
return self
def set_permissions(self, subject, permission_names):
permission_action = self._pb.actions.add()
permission_action.set.MergeFrom(Permissions(subject, permission_names).to_pb())
return self
def change_owner(self, owner):
permission_action = self._pb.actions.add()
permission_action.change_owner = owner
return self
def clear_permissions(self):
self._pb.clear_permissions = True
return self
def to_pb(self):
return self._pb
class Permissions(object):
__slots__ = ("subject", "permission_names")
def __init__(self, subject, permission_names):
"""
Represents permissions
:param subject: A subject of permission names
:param permission_names: A list of permission names
"""
self.subject = subject
self.permission_names = permission_names
def to_pb(self):
"""
:return: A protocol buffer representation of permissions
"""
pb = _apis.ydb_scheme.Permissions()
pb.subject = self.subject
pb.permission_names.extend(self.permission_names)
return pb
def _modify_permissions_request_factory(path, settings):
"""
Constructs modify permissions request
:param path: A path to apply permissions
:param settings: An instance of ModifyPermissionsSettings
:return: A constructed request
"""
modify_permissions_request = settings.to_pb()
modify_permissions_request.path = path
return modify_permissions_request
def _wrap_permissions(permissions):
"""
Wraps permissions protocol buffers into native Python objects
:param permissions: A protocol buffer representation of permissions
:return: A iterable of permissions
"""
return tuple(Permissions(permission.subject, permission.permission_names) for permission in permissions)
def _wrap_scheme_entry(entry_pb, scheme_entry_cls=None, *args, **kwargs):
"""
Wraps scheme entry into native Python objects.
:param entry_pb: A protocol buffer representation of a scheme entry
:param scheme_entry_cls: A native Python class that represents scheme entry (
by default that is generic SchemeEntry)
:param args: A list of optional arguments
:param kwargs: A dictionary of with optional arguments
:return: A native Python reprensentation of scheme entry
"""
scheme_entry_cls = SchemeEntry if scheme_entry_cls is None else scheme_entry_cls
return scheme_entry_cls(
entry_pb.name,
entry_pb.owner,
SchemeEntryType(entry_pb.type),
_wrap_permissions(entry_pb.effective_permissions),
_wrap_permissions(entry_pb.permissions),
entry_pb.size_bytes,
*args,
**kwargs
)
def _wrap_list_directory_response(rpc_state, response):
"""
Wraps list directory response
:param response: A list directory response
:return: A directory
"""
issues._process_response(response.operation)
message = _apis.ydb_scheme.ListDirectoryResult()
response.operation.result.Unpack(message)
children = []
supported_items = set(i.value for i in SchemeEntryType)
for children_item in message.children:
if children_item.type not in supported_items:
continue
children.append(_wrap_scheme_entry(children_item))
return Directory(
message.self.name,
message.self.owner,
SchemeEntryType(message.self.type),
_wrap_permissions(message.self.effective_permissions),
_wrap_permissions(message.self.permissions),
tuple(children),
)
def _wrap_describe_path_response(rpc_state, response):
issues._process_response(response.operation)
message = _apis.ydb_scheme.DescribePathResult()
response.operation.result.Unpack(message)
return _wrap_scheme_entry(message.self)
class ISchemeClient(abc.ABC):
@abstractmethod
def __init__(self, driver):
pass
@abstractmethod
def make_directory(self, path, settings):
pass
@abstractmethod
def remove_directory(self, path, settings):
pass
@abstractmethod
def list_directory(self, path, settings):
pass
@abstractmethod
def describe_path(self, path, settings):
pass
@abstractmethod
def modify_permissions(self, path, settings):
"""
Modifies permissions for provided scheme entry
:param path: A path of scheme entry
:param settings: An instance of ModifyPermissionsSettings
:return: An operation if success or exception on case of failure
"""
pass
class BaseSchemeClient(ISchemeClient):
__slots__ = ("_driver",)
def __init__(self, driver):
self._driver = driver
def make_directory(self, path, settings=None):
return self._driver(
_make_directory_request_factory(path),
_apis.SchemeService.Stub,
_apis.SchemeService.MakeDirectory,
operation.Operation,
settings,
)
def remove_directory(self, path, settings=None):
return self._driver(
_remove_directory_request_factory(path),
_apis.SchemeService.Stub,
_apis.SchemeService.RemoveDirectory,
operation.Operation,
settings,
)
def list_directory(self, path, settings=None):
return self._driver(
_list_directory_request_factory(path),
_apis.SchemeService.Stub,
_apis.SchemeService.ListDirectory,
_wrap_list_directory_response,
settings,
)
def describe_path(self, path, settings=None):
return self._driver(
_describe_path_request_factory(path),
_apis.SchemeService.Stub,
_apis.SchemeService.DescribePath,
_wrap_describe_path_response,
settings,
)
def modify_permissions(self, path, settings):
"""
Modifies permissions for provided scheme entry
:param path: A path of scheme entry
:param settings: An instance of ModifyPermissionsSettings
:return: An operation if success or exception on case of failure
"""
return self._driver(
_modify_permissions_request_factory(path, settings),
_apis.SchemeService.Stub,
_apis.SchemeService.ModifyPermissions,
operation.Operation,
settings,
)
[docs]
class SchemeClient(BaseSchemeClient):
[docs]
def async_make_directory(self, path, settings=None):
return self._driver.future(
_make_directory_request_factory(path),
_apis.SchemeService.Stub,
_apis.SchemeService.MakeDirectory,
operation.Operation,
settings,
)
[docs]
def async_remove_directory(self, path, settings=None):
return self._driver.future(
_remove_directory_request_factory(path),
_apis.SchemeService.Stub,
_apis.SchemeService.RemoveDirectory,
operation.Operation,
settings,
)
[docs]
def async_list_directory(self, path, settings=None):
return self._driver.future(
_list_directory_request_factory(path),
_apis.SchemeService.Stub,
_apis.SchemeService.ListDirectory,
_wrap_list_directory_response,
settings,
)
[docs]
def async_describe_path(self, path, settings=None):
return self._driver.future(
_describe_path_request_factory(path),
_apis.SchemeService.Stub,
_apis.SchemeService.DescribePath,
_wrap_describe_path_response,
settings,
)
[docs]
def async_modify_permissions(self, path, settings):
"""
Modifies permissions for provided scheme entry
:param path: A path of scheme entry
:param settings: An instance of ModifyPermissionsSettings
:return: An future of computation
"""
return self._driver.future(
_modify_permissions_request_factory(path, settings),
_apis.SchemeService.Stub,
_apis.SchemeService.ModifyPermissions,
operation.Operation,
settings,
)