YDB Python SDK ============== Python client for `YDB `_ — a fault-tolerant distributed SQL database. .. toctree:: :hidden: :caption: Getting Started overview quickstart .. toctree:: :hidden: :caption: Connecting driver .. toctree:: :hidden: :caption: Services query topic table coordination scheme .. toctree:: :hidden: :caption: Observability opentelemetry .. toctree:: :hidden: :caption: Reference types errors apireference New to the SDK? --------------- Start with :doc:`quickstart` — it shows how to install the package, connect to a database, and run your first query in under five minutes. Then read :doc:`driver` to understand how to configure the connection: which endpoint and database to use, how to authenticate, and how to set up TLS. Every service in the SDK is accessed through a ``Driver`` instance, so this page is the foundation for everything else. LLM-ready Documentation ----------------------- For AI-assisted coding, use `llms.txt `_ as the documentation index and `llms-full.txt `_ as the complete documentation context. Running Queries --------------- The :doc:`query` page covers the Query service — the primary API for executing YQL statements. It explains ``QuerySessionPool``, how to run DDL and DML, how to pass parameters safely, and how to work with transactions. Start here if you want to read or write data. The :doc:`types` page is a companion to query: it shows how Python values map to YDB types and how to specify types explicitly when the automatic mapping is not enough. Messaging --------- The :doc:`topic` page covers the Topic service — a persistent message queue similar to Kafka. It explains how to create topics, write messages, read and commit them, and how topics integrate with transactions. Both synchronous and asynchronous patterns are covered. Table Service ------------- The :doc:`table` page covers ``driver.table_client`` — the lower-level API for operations that cannot be expressed in YQL: creating tables with custom partitioning, TTL, secondary indexes, and column families; bulk loading data with ``bulk_upsert``; and streaming full-table reads with ``read_table`` or ``scan_query``. Use this alongside the Query service when you need fine-grained schema or data-loading control. Distributed Coordination ------------------------ The :doc:`coordination` page covers distributed semaphores and leader election. If you need to limit concurrent access to a shared resource across multiple processes or hosts, this is the service to use. Schema Management ----------------- The :doc:`scheme` page covers ``driver.scheme_client`` — creating and removing directories, listing directory contents, and describing any path in the YDB hierarchy (tables, topics, coordination nodes, etc.). Error Handling and Retries -------------------------- The :doc:`errors` page is important reading before going to production. YDB returns structured error codes, and the SDK's retry logic depends on them. This page explains which errors are safe to retry, which are not, how to tune backoff settings, and how to use the ``@ydb_retry`` decorator. Skipping this section is a common source of production incidents. Observability ------------- The :doc:`opentelemetry` page explains how to add distributed tracing to your application using OpenTelemetry. One call to ``enable_tracing()`` instruments query sessions, transactions, and connection pool operations — so you can visualize request flow in Jaeger, Grafana, or any OpenTelemetry-compatible backend. API Reference ------------- The :doc:`apireference` page contains auto-generated documentation for all public classes and methods. Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` Overview ======== Project Homepage ---------------- YDB Python SDK is hosted on GitHub at https://github.com/ydb-platform/ydb-python-sdk under the ydb-platform organization. Releases and project status are available on Pypi at https://pypi.org/project/ydb. The most recent published version of this documentation should be at https://ydb-platform.github.io/ydb-python-sdk. Community --------- You can ask your questions in official Telegram chats: `EN `_ | `RU `_. Bugs and feature enhancements to YDB Python SDK should be reported on the `GitHub issue tracker `_. Quick Start =========== Installation ------------ Prerequisites ^^^^^^^^^^^^^ * Python 3.8 or higher; * ``pip`` version 9.0.1 or higher; If necessary, upgrade your version of ``pip``:: python -m pip install --upgrade pip If you cannot upgrade `pip` due to a system-owned installation, you can run the example in a virtualenv:: python -m pip install virtualenv virtualenv venv source venv/bin/activate python -m pip install --upgrade pip Installation via Pypi ^^^^^^^^^^^^^^^^^^^^^ To install YDB Python SDK through Pypi execute the following command:: pip install ydb Installation From Sources ^^^^^^^^^^^^^^^^^^^^^^^^^ To install YDB Python SDK from sources execute the following command from the root of repository:: pip install -e . Usage ----- Import Package ^^^^^^^^^^^^^^ .. code-block:: python import ydb Driver Initialization ^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python endpoint = "grpc://localhost:2136" # your ydb endpoint database = "/local" # your ydb database with ydb.Driver( endpoint=endpoint, database=database, credentials=ydb.credentials_from_env_variables(), ) as driver: driver.wait(timeout=5, fail_fast=True) SessionPool Initialization ^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python with ydb.QuerySessionPool(driver) as pool: pass Query Execution ^^^^^^^^^^^^^^^ Python SDK supports queries described by YQL syntax. There are two primary methods for executing queries, each with different properties and use cases: * ``pool.execute_with_retries``: * Buffers the entire result set in client memory. * Automatically retries execution in case of retriable issues. * Does not allow specifying a transaction execution mode. * Recommended for one-off queries that are expected to produce small result sets. * ``tx.execute``: * Returns an iterator over the query results, allowing processing of results that may not fit into client memory. * Retries must be handled manually via `pool.retry_operation_sync`. * Allows specifying a transaction execution mode. * Recommended for scenarios where `pool.execute_with_retries` is insufficient. Usage of ``pool.execute_with_retries()``: .. code-block:: python pool.execute_with_retries("DROP TABLE IF EXISTS example") pool.execute_with_retries("CREATE TABLE example(key UInt64, value String, PRIMARY KEY (key))") pool.execute_with_retries("INSERT INTO example (key, value) VALUES (1, 'luffy')") res = pool.execute_with_retries("SELECT COUNT(*) AS rows_count FROM example") >>> res[0].rows_count 1 Example of ``tx.execute()``: .. code-block:: python def callee(session: ydb.QuerySessionSync): with session.transaction() as tx: with tx.execute( "INSERT INTO example (key, value) VALUES (2, 'zoro')" ): pass with tx.execute( "INSERT INTO example (key, value) VALUES (3, 'sanji')", commit_tx=True, ): pass pool.retry_operation_sync(callee) Driver ====== The driver is the entry point for all YDB operations. It manages endpoint discovery, connection pooling, and load balancing. All service clients (query, topic, table) are obtained through the driver. DriverConfig ------------ ``DriverConfig`` holds all settings needed to initialize a driver. Pass it to the ``Driver`` constructor, or use shorthand keyword arguments directly on the ``Driver``. **Constructor parameters:** .. list-table:: :header-rows: 1 :widths: 25 15 60 * - Parameter - Default - Description * - ``endpoint`` - *required* - gRPC endpoint: ``grpc://host:port`` (plain) or ``grpcs://host:port`` (TLS). * - ``database`` - ``None`` - Full database path, e.g. ``/local`` or ``/ru-central1/b1g.../etn...``. * - ``credentials`` - ``None`` - Credentials instance. Falls back to :class:`~ydb.AnonymousCredentials` if ``None``. * - ``root_certificates`` - ``None`` - PEM-encoded CA certificate(s) as ``bytes`` for TLS verification. * - ``certificate_chain`` - ``None`` - PEM-encoded client certificate chain as ``bytes`` (mutual TLS). * - ``private_key`` - ``None`` - PEM-encoded client private key as ``bytes`` (mutual TLS). * - ``grpc_keep_alive_timeout`` - ``None`` - gRPC KeepAlive timeout in milliseconds. * - ``disable_discovery`` - ``False`` - If ``True``, skip endpoint discovery and use only the initial endpoint. * - ``discovery_request_timeout`` - ``10`` - Timeout in seconds for each discovery request. * - ``table_client_settings`` - ``None`` - :class:`~ydb.TableClientSettings` instance to configure the Table service client. * - ``topic_client_settings`` - ``None`` - :class:`~ydb.TopicClientSettings` instance to configure the Topic service client. * - ``query_client_settings`` - ``None`` - :class:`~ydb.QueryClientSettings` instance to configure the Query service client. * - ``compression`` - ``None`` - gRPC-level compression (e.g. ``grpc.Compression.Gzip``). Creating a Driver ----------------- There are three equivalent ways to specify the connection target: **Using a DriverConfig object:** .. code-block:: python import ydb config = ydb.DriverConfig( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) driver = ydb.Driver(config) **Using keyword arguments directly on Driver:** .. code-block:: python driver = ydb.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) **Using a connection string:** .. code-block:: python driver = ydb.Driver( connection_string="grpc://localhost:2136/?database=/local", ) The connection string format is ``://:/?database=``. Waiting for the Driver to Connect ---------------------------------- After construction the driver starts endpoint discovery in the background. Call ``wait()`` before issuing any requests: .. code-block:: python try: driver.wait(timeout=5, fail_fast=True) except TimeoutError: raise RuntimeError("Could not connect to YDB") * ``timeout`` — how long to wait in seconds. * ``fail_fast=True`` — raise immediately if discovery fails instead of retrying until timeout. Closing the Driver ------------------ Always close the driver when done to release gRPC connections: .. code-block:: python driver.stop() Using the driver as a context manager is the recommended pattern — ``stop()`` is called automatically on exit: .. code-block:: python with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) # ... use driver Async Driver ------------ ``ydb.aio.Driver`` mirrors the synchronous ``Driver`` with async methods: .. code-block:: python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) as driver: await driver.wait(timeout=5, fail_fast=True) # ... use driver asyncio.run(main()) Credentials ----------- AnonymousCredentials ^^^^^^^^^^^^^^^^^^^^ No authentication. Use for local or unauthenticated deployments (`full example `_): .. code-block:: python credentials = ydb.AnonymousCredentials() AccessTokenCredentials ^^^^^^^^^^^^^^^^^^^^^^ Pass a static IAM token or API key directly (`full example `_): .. code-block:: python credentials = ydb.AccessTokenCredentials("your-token") StaticCredentials (username/password) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ (`full example `_) .. code-block:: python credentials = ydb.StaticCredentials.from_user_password("user", "password") Service Account Credentials ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Authenticate as a Yandex Cloud service account using a key file. Requires the ``ydb[yc]`` extra (``pip install ydb[yc]``) (`full example `_): .. code-block:: python import os import ydb.iam credentials = ydb.iam.ServiceAccountCredentials.from_file( os.getenv("SA_KEY_FILE"), ) Metadata Credentials ^^^^^^^^^^^^^^^^^^^^^ Picks up credentials from the instance metadata service when running inside Yandex Cloud (Compute VM, Cloud Functions, etc.). Requires ``ydb[yc]`` (`full example `_): .. code-block:: python import ydb.iam credentials = ydb.iam.MetadataUrlCredentials() OAuth 2.0 Token Exchange ^^^^^^^^^^^^^^^^^^^^^^^^^ For federated identity scenarios. Exchanges a subject token (e.g. a signed JWT) for a YDB access token via an OAuth 2.0 token exchange endpoint (`full example `_): .. code-block:: python import ydb.oauth2_token_exchange credentials = ydb.oauth2_token_exchange.Oauth2TokenExchangeCredentials( token_endpoint="https://auth.example.com/oauth/token", audience="ydb", subject_token_source=ydb.oauth2_token_exchange.JwtTokenSource( signing_method="RS256", private_key_file="/path/to/private.pem", key_id="my-key-id", issuer="my-issuer", subject="my-subject", audience="ydb", ), ) Credentials from environment variables ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The recommended option for cloud deployments — the SDK auto-selects the appropriate provider based on which environment variable is set: .. list-table:: :header-rows: 1 :widths: 40 60 * - Environment variable - Credentials type selected * - ``YDB_ACCESS_TOKEN_CREDENTIALS`` - ``AccessTokenCredentials`` * - ``YDB_ANONYMOUS_CREDENTIALS`` - ``AnonymousCredentials`` * - ``YDB_METADATA_CREDENTIALS`` - ``MetadataUrlCredentials`` * - ``YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS`` - ``ServiceAccountCredentials`` .. code-block:: python credentials = ydb.credentials_from_env_variables() .. note:: ``ServiceAccountCredentials`` and ``MetadataUrlCredentials`` require the ``ydb[yc]`` extra: .. code-block:: sh pip install ydb[yc] TLS / Secure Connections ------------------------- For TLS endpoints (``grpcs://``), the SDK uses the system CA bundle by default. To supply a custom CA certificate: .. code-block:: python with open("ca.pem", "rb") as f: ca_cert = f.read() driver = ydb.Driver( endpoint="grpcs://ydb.example.com:2135", database="/production/mydb", credentials=ydb.credentials_from_env_variables(), root_certificates=ca_cert, ) For mutual TLS (client certificate authentication): .. code-block:: python driver = ydb.Driver( endpoint="grpcs://ydb.example.com:2135", database="/production/mydb", root_certificates=ca_cert, certificate_chain=client_cert_pem, private_key=client_key_pem, ) Service Clients --------------- Once the driver is ready, access service clients via its properties: .. list-table:: :header-rows: 1 :widths: 30 30 40 * - Attribute - Type - Purpose * - ``driver.table_client`` - :class:`~ydb.TableClient` - Schema management, bulk upsert, streaming reads. See :doc:`table`. * - ``driver.topic_client`` - :class:`~ydb.TopicClient` - Topic writers, readers, topic management. See :doc:`topic`. * - ``driver.scheme_client`` - :class:`~ydb.SchemeClient` - Directory and path operations. See :doc:`scheme`. :class:`~ydb.QuerySessionPool` is constructed explicitly from the driver, not accessed as a property: .. code-block:: python with ydb.Driver(endpoint=..., database=...) as driver: driver.wait(timeout=5, fail_fast=True) with ydb.QuerySessionPool(driver) as pool: # Query service pass topics = driver.topic_client # Topic service scheme = driver.scheme_client # Schema operations table = driver.table_client # Table service Query Service ============= The Query service is the primary API for executing YQL queries in YDB. It supports DDL (``CREATE TABLE``, ``DROP TABLE``) and DML (``SELECT``, ``INSERT``, ``UPDATE``, ``DELETE``) in a single unified interface with full transaction support. QuerySessionPool ---------------- :class:`~ydb.QuerySessionPool` manages a pool of server-side sessions and provides safe, retriable execution of queries. This is the main entry point for the Query service. **Create a pool:** .. code-block:: python import ydb with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) with ydb.QuerySessionPool(driver) as pool: # ... use pool The pool is closed automatically when used as a context manager. Call ``pool.stop()`` explicitly if not using ``with``. **Async pool:** .. code-block:: python import asyncio import ydb async def main(): async with ydb.aio.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: await driver.wait(timeout=5, fail_fast=True) async with ydb.aio.QuerySessionPool(driver) as pool: # ... use pool Session Acquire Timeout ^^^^^^^^^^^^^^^^^^^^^^^ When the pool is exhausted (all sessions are in use), ``acquire()`` and ``checkout()`` block until a session becomes free. The default behavior is to wait indefinitely. **Wait indefinitely (default):** .. code-block:: python session = pool.acquire() # blocks until a session is available session = pool.acquire(timeout=None) # explicit, same behavior **Fail immediately if no session is available:** .. code-block:: python from ydb import SessionPoolEmpty try: session = pool.acquire(timeout=0) except SessionPoolEmpty: # no session was available right now ... **Fail after a deadline:** .. code-block:: python try: session = pool.acquire(timeout=5.0) except SessionPoolEmpty: # no session became available within 5 seconds ... The same ``timeout`` parameter is available on ``checkout()``: .. code-block:: python with pool.checkout(timeout=5.0) as session: ... When using the retry helpers, set ``max_session_acquire_timeout`` in :class:`~ydb.RetrySettings` to apply a per-call timeout across all retry attempts: .. code-block:: python import ydb settings = ydb.RetrySettings(max_session_acquire_timeout=5.0) pool.retry_tx_sync(callee, retry_settings=settings) .. warning:: **Never call a retry helper from inside another retry helper on the same pool.** Each retry helper holds a session for the duration of the call. If the inner call also needs a session and the pool is full, both sides wait for each other and the program deadlocks silently. .. code-block:: python # WRONG — if pool size == 1 (or all sessions are taken), this deadlocks: def outer(tx): pool.retry_tx_sync(inner) # tries to acquire a second session while one is held pool.retry_tx_sync(outer) If you need auxiliary data, load it before entering the outer retry call, or use a separate pool. execute_with_retries -------------------- The simplest way to run a query. Buffers all results into memory, retries on transient errors, and runs outside any transaction: .. code-block:: python # DDL pool.execute_with_retries("CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))") # DML pool.execute_with_retries( "INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')" ) # Query with results result_sets = pool.execute_with_retries("SELECT id, name FROM users") for row in result_sets[0].rows: print(row["id"], row["name"]) .. note:: ``execute_with_retries`` loads the entire result set into memory before returning. For large result sets use ``retry_operation_sync`` with streaming iteration instead. **Async:** .. code-block:: python result_sets = await pool.execute_with_retries("SELECT id, name FROM users") Parameterized Queries --------------------- Always use parameters instead of string interpolation to avoid SQL injection and to allow the server to cache query plans. **Implicit types** — Python values are mapped to YDB types automatically: .. code-block:: python pool.execute_with_retries( "DECLARE $id AS Uint64; SELECT * FROM users WHERE id = $id", parameters={"$id": 42}, ) **Explicit types via tuple** — pass ``(value, ydb_type)`` when the automatic mapping is ambiguous or you need a specific type: .. code-block:: python pool.execute_with_retries( "DECLARE $ids AS List; SELECT * FROM users WHERE id IN $ids", parameters={"$ids": ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64))}, ) **Explicit types via TypedValue:** .. code-block:: python pool.execute_with_retries( "DECLARE $id AS Int64; SELECT * FROM users WHERE id = $id", parameters={"$id": ydb.TypedValue(42, ydb.PrimitiveType.Int64)}, ) Common :class:`~ydb.PrimitiveType` values: ``Bool``, ``Int32``, ``Int64``, ``Uint32``, ``Uint64``, ``Float``, ``Double``, ``String``, ``Utf8``, ``Json``, ``Timestamp``, ``Date``, ``Datetime``. retry_operation_sync -------------------- Use this when you need manual control over a session — for example, to stream large result sets or to execute multiple queries in sequence on the same session: .. code-block:: python def callee(session: ydb.QuerySession): with session.execute("SELECT COUNT(*) AS cnt FROM users") as results: for result_set in results: print(result_set.rows[0]["cnt"]) pool.retry_operation_sync(callee) The pool acquires a session, calls ``callee(session)``, and retries the whole call on retriable errors (e.g. ``UNAVAILABLE``, ``ABORTED``). **Async:** .. code-block:: python async def callee(session: ydb.aio.QuerySession): async with await session.execute("SELECT COUNT(*) AS cnt FROM users") as results: async for result_set in results: print(result_set.rows[0]["cnt"]) await pool.retry_operation_async(callee) retry_tx_sync ------------- Use this to execute a group of queries atomically within a transaction. The pool handles session acquisition, transaction begin, commit, and retries: .. code-block:: python def callee(tx: ydb.QueryTxContext): with tx.execute("INSERT INTO users (id, name) VALUES (3, 'Carol')"): pass pool.retry_tx_sync(callee) If ``callee`` raises a retriable error the whole function (including the transaction) is re-executed from scratch — make sure ``callee`` is idempotent or relies on the transaction's atomicity for correctness. **Async:** .. code-block:: python async def callee(tx: ydb.aio.QueryTxContext): async with await tx.execute("INSERT INTO users (id, name) VALUES (3, 'Carol')"): pass await pool.retry_tx_async(callee) **With an explicit transaction mode:** .. code-block:: python pool.retry_tx_sync(callee, tx_mode=ydb.QuerySnapshotReadOnly()) Transactions ------------ Transaction Modes ^^^^^^^^^^^^^^^^^ .. list-table:: :header-rows: 1 :widths: 35 65 * - Mode - Description * - :class:`~ydb.QuerySerializableReadWrite` - Full ACID serializable isolation. Default. Supports reads and writes. * - :class:`~ydb.QuerySnapshotReadOnly` - Consistent read-only snapshot taken at transaction start. * - :class:`~ydb.QuerySnapshotReadWrite` - Snapshot reads with write support; write conflicts are possible. * - :class:`~ydb.QueryOnlineReadOnly` - Each read returns the most recent data at execution time. No cross-read consistency. * - :class:`~ydb.QueryOnlineReadOnly` ``(allow_inconsistent_reads=True)`` - Fastest reads; even individual reads may be slightly inconsistent. * - :class:`~ydb.QueryStaleReadOnly` - Reads may lag by fractions of a second. Each individual read is consistent. Manual Transaction Control ^^^^^^^^^^^^^^^^^^^^^^^^^^ Use ``session.transaction()`` when you need fine-grained control: **Synchronous:** .. code-block:: python def callee(session: ydb.QuerySession): with session.transaction() as tx: tx.begin() with tx.execute("INSERT INTO users (id, name) VALUES (4, 'Dave')"): pass with tx.execute("SELECT COUNT(*) AS cnt FROM users") as results: for result_set in results: print("count:", result_set.rows[0]["cnt"]) tx.commit() # tx.rollback() to discard changes instead pool.retry_operation_sync(callee) **Commit on last query** — avoids an extra round-trip: .. code-block:: python def callee(session: ydb.QuerySession): with session.transaction() as tx: tx.begin() with tx.execute( "INSERT INTO users (id, name) VALUES (5, 'Eve')", commit_tx=True, ): pass pool.retry_operation_sync(callee) **Async:** .. code-block:: python async def callee(session: ydb.aio.QuerySession): async with session.transaction() as tx: await tx.begin() async with await tx.execute( "INSERT INTO users (id, name) VALUES (4, 'Dave')" ): pass async with await tx.execute( "SELECT COUNT(*) AS cnt FROM users" ) as results: async for result_set in results: print("count:", result_set.rows[0]["cnt"]) await tx.commit() await pool.retry_operation_async(callee) .. note:: The transaction context manager calls ``rollback()`` automatically on exit if you did not commit explicitly. Working with Result Sets ------------------------ ``session.execute()`` and ``tx.execute()`` return an iterator of ``ResultSet`` objects. A single query may return multiple result sets (e.g. when it contains multiple ``SELECT`` statements). **Synchronous iteration:** .. code-block:: python with session.execute("SELECT id, name FROM users ORDER BY id") as results: for result_set in results: for row in result_set.rows: print(row["id"], row["name"].decode()) **Async iteration:** .. code-block:: python async with await session.execute("SELECT id, name FROM users") as results: async for result_set in results: for row in result_set.rows: print(row["id"], row["name"].decode()) **ResultSet fields:** .. code-block:: python result_set.rows # list of Row objects — access columns by name: row["col"] result_set.columns # list of column descriptors with .name and .type result_set.truncated # True if the server truncated results (use pagination) .. note:: ``String`` columns return ``bytes``; ``Utf8`` columns return ``str``. Decode as needed: ``row["name"].decode("utf-8")``. DDL Queries ----------- DDL is executed the same way as DML — through the pool or a session: .. code-block:: python pool.execute_with_retries( """ CREATE TABLE orders ( order_id Uint64, user_id Uint64, amount Double, created Timestamp, PRIMARY KEY (order_id) ) """ ) pool.execute_with_retries("DROP TABLE IF EXISTS orders") Complete Examples ----------------- Synchronous ^^^^^^^^^^^ .. code-block:: python import ydb with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) with ydb.QuerySessionPool(driver) as pool: # Schema pool.execute_with_retries("DROP TABLE IF EXISTS users") pool.execute_with_retries( "CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))" ) # Insert in a transaction def insert(tx: ydb.QueryTxContext): with tx.execute( "INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')", commit_tx=True, ): pass pool.retry_tx_sync(insert) # Read result_sets = pool.execute_with_retries("SELECT id, name FROM users") for row in result_sets[0].rows: print(row["id"], row["name"]) Asynchronous ^^^^^^^^^^^^ .. code-block:: python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local" ) as driver: await driver.wait(timeout=5, fail_fast=True) async with ydb.aio.QuerySessionPool(driver) as pool: # Schema await pool.execute_with_retries("DROP TABLE IF EXISTS users") await pool.execute_with_retries( "CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))" ) # Insert in a transaction async def insert(tx: ydb.aio.QueryTxContext): async with await tx.execute( "INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')", commit_tx=True, ): pass await pool.retry_tx_async(insert) # Read result_sets = await pool.execute_with_retries( "SELECT id, name FROM users" ) for row in result_sets[0].rows: print(row["id"], row["name"]) asyncio.run(main()) Apache Arrow Format ------------------- By default query results are returned as YDB value objects. For analytics workloads you can request results in `Apache Arrow `_ IPC format, which integrates directly with pandas, polars, and other columnar tools. Requires ``pyarrow`` to be installed (``pip install pyarrow``). .. code-block:: python import pyarrow as pa import ydb result_sets = pool.execute_with_retries( "SELECT id, name, score FROM users ORDER BY id LIMIT 1000", result_set_format=ydb.QueryResultSetFormat.ARROW, ) for result_set in result_sets: schema = pa.ipc.read_schema(pa.py_buffer(result_set.arrow_format_meta.schema)) batch = pa.ipc.read_record_batch(pa.py_buffer(result_set.data), schema) df = batch.to_pandas() print(df.head()) Arrow results can also be compressed on the wire. Pass ``arrow_format_settings`` to choose a codec: .. code-block:: python settings = ydb.ArrowFormatSettings( compression_codec=ydb.ArrowCompressionCodec( ydb.ArrowCompressionCodecType.ZSTD, level=10, ) ) result_sets = pool.execute_with_retries( "SELECT * FROM events LIMIT 100000", result_set_format=ydb.QueryResultSetFormat.ARROW, arrow_format_settings=settings, ) .. note:: Arrow format is only useful when you process results with a columnar library. For regular row-by-row processing use the default ``VALUE`` format — it has no extra dependencies and lower overhead for small result sets. Query Stats and Explain ----------------------- Query Stats ^^^^^^^^^^^ YDB can return execution statistics alongside query results. Pass a :class:`~ydb.QueryStatsMode` value as ``stats_mode`` to ``session.execute()`` or ``tx.execute()``, then read ``last_query_stats`` after the iterator is consumed: .. code-block:: python def callee(session: ydb.QuerySession): with session.execute( "SELECT * FROM users", stats_mode=ydb.QueryStatsMode.BASIC, ): pass # must iterate to completion for stats to be populated print(session.last_query_stats) pool.retry_operation_sync(callee) The same works inside a transaction — stats reflect the last executed statement: .. code-block:: python def callee(tx: ydb.QueryTxContext): with tx.execute( "SELECT COUNT(*) AS cnt FROM users", stats_mode=ydb.QueryStatsMode.FULL, ) as results: for result_set in results: print(result_set.rows[0]["cnt"]) print(tx.last_query_stats) pool.retry_tx_sync(callee) **Stats modes:** .. list-table:: :header-rows: 1 :widths: 30 70 * - :class:`~ydb.QueryStatsMode` value - What is returned * - ``QueryStatsMode.NONE`` - No statistics (default). * - ``QueryStatsMode.BASIC`` - Row counts and execution time per stage. * - ``QueryStatsMode.FULL`` - Full per-operator statistics. * - ``QueryStatsMode.PROFILE`` - Full statistics plus the query execution plan. Use this for performance investigations. Explain ^^^^^^^ ``explain`` returns the query execution plan without actually running the query. Use it to understand how YDB will execute a statement before sending it to production: .. code-block:: python # Returns a JSON string plan = pool.explain_with_retries("SELECT * FROM users WHERE id = 1") print(plan) # Returns a parsed dict plan_dict = pool.explain_with_retries( "SELECT * FROM users WHERE id = 1", result_format=ydb.QueryExplainResultFormat.DICT, ) print(plan_dict["Plan"]) You can also call ``explain`` directly on a session: .. code-block:: python def callee(session: ydb.QuerySession): plan = session.explain("SELECT * FROM users WHERE id = 1") print(plan) pool.retry_operation_sync(callee) Topic Service ============= YDB Topic Service is a persistent message queue that supports multiple producers and consumers, horizontal scaling via partitioning, and at-least-once delivery semantics. The Python SDK provides both synchronous and asynchronous clients with identical APIs. Concepts -------- **Topic** — a named stream of messages divided into one or more partitions. **Partition** — an ordered, append-only log. Messages within a partition are delivered in order. **Producer** — a client that writes messages to a topic. Each producer has a ``producer_id`` that is used to deduplicate messages via sequence numbers (``seqno``). **Consumer** — a named group of readers that share offset state. Each consumer independently tracks its position in every partition, so multiple consumer groups can read the same topic without interfering with each other. Getting a Topic Client ---------------------- The topic client is available on every driver instance via the ``topic_client`` property. **Synchronous:** .. code-block:: python import ydb with ydb.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) as driver: driver.wait(timeout=5, fail_fast=True) topic_client = driver.topic_client **Asynchronous:** .. code-block:: python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) as driver: await driver.wait(timeout=5, fail_fast=True) topic_client = driver.topic_client Topic Management ---------------- Create a Topic ^^^^^^^^^^^^^^ .. code-block:: python # Synchronous driver.topic_client.create_topic( "/local/my-topic", min_active_partitions=1, max_active_partitions=10, retention_period=datetime.timedelta(hours=24), consumers=["my-consumer"], ) # Asynchronous await driver.topic_client.create_topic( "/local/my-topic", min_active_partitions=1, max_active_partitions=10, retention_period=datetime.timedelta(hours=24), consumers=["my-consumer"], ) Key parameters for ``create_topic``: * ``path`` — full topic path including database prefix. * ``min_active_partitions`` — minimum number of active partitions (default: 1). * ``max_active_partitions`` — maximum number of partitions when auto-scaling is enabled. * ``retention_period`` — how long messages are kept (``datetime.timedelta``). * ``retention_storage_mb`` — maximum storage size per partition in megabytes. * ``supported_codecs`` — list of :class:`~ydb.TopicCodec` values the topic accepts (default: RAW and GZIP). * ``consumers`` — list of consumer names (strings) or :class:`~ydb.TopicConsumer` objects to create upfront. * ``partition_write_speed_bytes_per_second`` — per-partition write throughput limit. Alter a Topic ^^^^^^^^^^^^^ .. code-block:: python # Add a consumer driver.topic_client.alter_topic( "/local/my-topic", add_consumers=["new-consumer"], ) # Change retention driver.topic_client.alter_topic( "/local/my-topic", set_retention_period=datetime.timedelta(hours=48), ) Drop a Topic ^^^^^^^^^^^^ .. code-block:: python driver.topic_client.drop_topic("/local/my-topic") Describe a Topic ^^^^^^^^^^^^^^^^ .. code-block:: python description = driver.topic_client.describe_topic("/local/my-topic") print(description.partitions) print(description.consumers) # Include partition-level statistics description = driver.topic_client.describe_topic( "/local/my-topic", include_stats=True, ) Writing Messages ---------------- Create a Writer ^^^^^^^^^^^^^^^ Use ``topic_client.writer()`` as a context manager (recommended) or create it manually. **Synchronous:** .. code-block:: python with driver.topic_client.writer("/local/my-topic") as writer: writer.write("hello") **Asynchronous:** .. code-block:: python async with driver.topic_client.writer("/local/my-topic") as writer: await writer.write("hello") **Without context manager** (you must call ``close()`` yourself): .. code-block:: python writer = driver.topic_client.writer("/local/my-topic") try: writer.write("hello") finally: writer.close() Writer Parameters ^^^^^^^^^^^^^^^^^ .. code-block:: python writer = driver.topic_client.writer( "/local/my-topic", producer_id="my-producer", # Unique producer ID; auto-generated UUID if omitted. partition_id=0, # Pin to a specific partition; None = auto-select. codec=ydb.TopicCodec.GZIP, # Compress messages. Default: RAW (no compression). auto_seqno=True, # Auto-increment sequence numbers (default: True). auto_created_at=True, # Auto-set message timestamps (default: True). ) * ``producer_id`` is used for deduplication: if the same ``(producer_id, seqno)`` pair is received again, YDB silently skips it. * When ``auto_seqno=True`` the SDK assigns monotonically increasing sequence numbers. Set ``auto_seqno=False`` and provide ``seqno`` manually when you need deterministic IDs. Sending Messages ^^^^^^^^^^^^^^^^ **Simple write** — non-blocking; buffers the message internally and returns immediately: .. code-block:: python writer.write("text message") writer.write(b"\x01\x02\x03") # bytes writer.write(["msg-1", "msg-2"]) # send multiple messages in one call **Full message form** with optional fields: .. code-block:: python writer.write(ydb.TopicWriterMessage( data="hello", seqno=42, # omit when auto_seqno=True created_at=datetime.datetime.now(), # omit when auto_created_at=True metadata_items={"key": "value"}, )) **Write and wait for server acknowledgment:** .. code-block:: python # Blocks until the server confirms the write. result = writer.write_with_ack("important message") # Get a Future and wait for it later (synchronous client). future = writer.async_write_with_ack("important message") future.result() # blocks here # Async client — await directly. result = await writer.write_with_ack("important message") **Flush** — wait for all previously buffered messages to be acknowledged: .. code-block:: python for i in range(100): writer.write(f"message-{i}") writer.flush() # blocks until all 100 messages are acked # Async version: await writer.flush() Async Write Pattern ^^^^^^^^^^^^^^^^^^^ For high-throughput pipelines, buffer writes and gather futures: .. code-block:: python import concurrent.futures futures = [] for i in range(100): future = writer.async_write_with_ack(f"message-{i}") futures.append(future) concurrent.futures.wait(futures) for f in futures: if f.exception(): raise f.exception() Writer Backpressure ^^^^^^^^^^^^^^^^^^^ By default the writer's internal buffer is unbounded — ``write()`` always returns immediately regardless of how many unacknowledged messages are in flight. Enable backpressure by setting one or both limits: .. code-block:: python writer = driver.topic_client.writer( "/local/my-topic", max_buffer_size_bytes=50 * 1024 * 1024, # pause when 50 MB in flight max_buffer_messages=1000, # pause when 1000 messages in flight ) A message is counted as occupying the buffer from the moment it is passed to ``write()`` until the server acknowledges it. Backpressure is active when **at least one** limit is set; setting both means either limit can trigger a wait (OR semantics). The limits are **soft**: ``write()`` blocks only if the buffer is *already* at or above the limit when the call starts. Once unblocked, the entire batch is admitted regardless of its size. This means callers that batch multiple messages in a single ``write()`` call will never deadlock even when the batch is larger than the limit. **Blocking behavior (default)** When the buffer is at or above the limit, ``write()`` blocks until enough messages are acknowledged by the server. There is no timeout by default — the call waits indefinitely: .. code-block:: python # Producer pauses here if the buffer is full, then proceeds once space is freed. writer.write("message") **Timeout** Set ``buffer_wait_timeout_sec`` to raise :class:`~ydb.TopicWriterBufferFullError` if space does not free up in time. Use a positive value to wait up to that many seconds, or ``0`` to fail immediately without waiting (non-blocking): .. code-block:: python writer = driver.topic_client.writer( "/local/my-topic", max_buffer_messages=500, buffer_wait_timeout_sec=5.0, # raise after 5 seconds; use 0 to fail immediately ) try: writer.write("message") except ydb.TopicWriterBufferFullError: # handle overload — log, drop, or apply back-off ... **Async client** The async writer behaves identically — ``await writer.write()`` suspends the coroutine instead of blocking the thread: .. code-block:: python writer = driver.topic_client.writer( "/local/my-topic", max_buffer_size_bytes=4 * 1024 * 1024, buffer_wait_timeout_sec=10.0, ) try: await writer.write("message") except ydb.TopicWriterBufferFullError: ... To apply your own timeout without raising an error, wrap the call with ``asyncio.wait_for``: .. code-block:: python try: await asyncio.wait_for(writer.write("message"), timeout=2.0) except asyncio.TimeoutError: ... # timed out waiting for buffer space Reading Messages ---------------- Create a Reader ^^^^^^^^^^^^^^^ A reader requires a ``consumer`` name that must already exist on the topic. **Synchronous:** .. code-block:: python with driver.topic_client.reader("/local/my-topic", consumer="my-consumer") as reader: message = reader.receive_message() print(message.data.decode()) reader.commit(message) **Asynchronous:** .. code-block:: python async with driver.topic_client.reader("/local/my-topic", consumer="my-consumer") as reader: message = await reader.receive_message() print(message.data.decode()) reader.commit(message) Reader Parameters ^^^^^^^^^^^^^^^^^ .. code-block:: python reader = driver.topic_client.reader( topic="/local/my-topic", # str, TopicReaderSelector, or a list of these consumer="my-consumer", buffer_size_bytes=50 * 1024 * 1024, # client-side buffer (default: 50 MB) buffer_release_threshold=0.5, # see below (default: 0.5) ) ``buffer_size_bytes`` controls how many bytes the server is allowed to send before the client signals that it is ready for more. The server will not exceed this limit. ``buffer_release_threshold`` (float in ``[0.0, 1.0]``) controls when the client sends a new ``ReadRequest`` to the server after consuming messages from the local buffer: * ``0.0`` — send a ``ReadRequest`` immediately after every batch is consumed. Produces more round-trips when many small batches arrive. * ``> 0.0`` — accumulate freed bytes until they reach ``threshold × buffer_size_bytes``, then send a single ``ReadRequest`` covering the accumulated amount. This reduces network round-trips. The default is ``0.5``. Example — reduce round-trips for a high-throughput reader with many small messages: .. code-block:: python reader = driver.topic_client.reader( "/local/my-topic", consumer="my-consumer", buffer_size_bytes=50 * 1024 * 1024, buffer_release_threshold=0.2, # send ReadRequest after freeing 10 MiB ) To read from multiple topics at once, pass a list: .. code-block:: python reader = driver.topic_client.reader( topic=["/local/topic-a", "/local/topic-b"], consumer="my-consumer", ) To fine-tune per-topic settings (e.g. start offset or timestamp), use :class:`~ydb.TopicReaderSelector`: .. code-block:: python reader = driver.topic_client.reader( topic=ydb.TopicReaderSelector( path="/local/my-topic", read_from=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc), ), consumer="my-consumer", ) Receiving Messages ^^^^^^^^^^^^^^^^^^ **One message at a time:** .. code-block:: python # Synchronous — blocks until a message arrives. message = reader.receive_message() print(message.data.decode()) reader.commit(message) # With timeout — raises TimeoutError if no message arrives within 1 second. try: message = reader.receive_message(timeout=1) except TimeoutError: print("no new messages") # Asynchronous: message = await reader.receive_message() **Batch processing:** .. code-block:: python # Synchronous batch = reader.receive_batch() for message in batch.messages: process(message) reader.commit(batch) # commit the whole batch at once # With size limits batch = reader.receive_batch(max_messages=100) # Asynchronous batch = await reader.receive_batch() for message in batch.messages: process(message) reader.commit(batch) Message Fields ^^^^^^^^^^^^^^ Each received message exposes: .. code-block:: python message.data # bytes — the message payload message.seqno # int — producer sequence number message.offset # int — partition offset message.partition_id # int — partition this message came from message.producer_id # str — producer_id set by the writer message.created_at # datetime — timestamp set by the writer message.written_at # datetime — timestamp when YDB persisted the message message.metadata_items # Dict[str, bytes] — arbitrary key-value metadata Committing Offsets ^^^^^^^^^^^^^^^^^^ Committing tells YDB that the consumer has successfully processed a message. YDB resumes from the last committed offset when the reader reconnects. .. code-block:: python # Non-blocking commit — buffered and sent in the background. reader.commit(message) reader.commit(batch) # Blocking commit — waits for the server to acknowledge the commit. reader.commit_with_ack(message) # Async version: await reader.commit_with_ack(message) .. note:: ``commit()`` is non-blocking and sufficient for most use cases. The offset is flushed automatically when the reader is closed (``flush=True`` by default). Handling Partition Rebalancing ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ When the topic rebalances (partitions are reassigned between reader instances), unfinished messages from revoked partitions become stale. The SDK signals this via the ``alive`` property: .. code-block:: python # Per message message = reader.receive_message() # ... do some work ... if message.alive: reader.commit(message) # Per batch — check before expensive processing batch = reader.receive_batch() for message in batch.messages: if not batch.alive: break # partition revoked, skip remaining messages process(message) if batch.alive: reader.commit(batch) Transactions ------------ The SDK integrates with YDB transactions so that topic writes and reads can be made atomically together with database queries. Transactional Write ^^^^^^^^^^^^^^^^^^^ Use ``topic_client.tx_writer(tx, topic)`` inside a transaction callback: **Synchronous:** .. code-block:: python def write_with_tx(driver: ydb.Driver, topic: str): with ydb.QuerySessionPool(driver) as pool: def callee(tx: ydb.QueryTxContext): tx_writer = driver.topic_client.tx_writer(tx, topic) for i in range(10): tx_writer.write(ydb.TopicWriterMessage(f"message-{i}")) pool.retry_tx_sync(callee) **Asynchronous:** .. code-block:: python async def write_with_tx(driver: ydb.aio.Driver, topic: str): async with ydb.aio.QuerySessionPool(driver) as pool: async def callee(tx: ydb.aio.QueryTxContext): tx_writer = driver.topic_client.tx_writer(tx, topic) for i in range(10): await tx_writer.write(ydb.TopicWriterMessage(f"message-{i}")) await pool.retry_tx_async(callee) Messages written via a ``tx_writer`` are visible to readers only after the transaction commits. If the transaction rolls back, the messages are discarded. Transactional Read ^^^^^^^^^^^^^^^^^^ Use ``reader.receive_batch_with_tx(tx)`` to read messages inside a transaction. The commit offset is advanced atomically with the transaction: **Synchronous:** .. code-block:: python def read_with_tx(driver: ydb.Driver, topic: str, consumer: str): with driver.topic_client.reader(topic, consumer) as reader: with ydb.QuerySessionPool(driver) as pool: def callee(tx: ydb.QueryTxContext): batch = reader.receive_batch_with_tx(tx, max_messages=10) for message in batch.messages: process(message) pool.retry_tx_sync(callee) **Asynchronous:** .. code-block:: python async def read_with_tx(driver: ydb.aio.Driver, topic: str, consumer: str): async with driver.topic_client.reader(topic, consumer) as reader: async with ydb.aio.QuerySessionPool(driver) as pool: async def callee(tx: ydb.aio.QueryTxContext): batch = await reader.receive_batch_with_tx(tx, max_messages=10) for message in batch.messages: process(message) await pool.retry_tx_async(callee) .. note:: Do not call ``reader.commit()`` when using ``receive_batch_with_tx`` — the commit is handled automatically by the transaction. Auto-Partitioning ----------------- YDB can automatically scale the number of partitions up (and optionally down) based on write throughput. Enable it when creating a topic: .. code-block:: python import datetime import ydb driver.topic_client.create_topic( "/local/my-topic", consumers=["my-consumer"], max_active_partitions=100, auto_partitioning_settings=ydb.TopicAutoPartitioningSettings( strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP, up_utilization_percent=70, # split a partition when it reaches 70% capacity down_utilization_percent=30, # merge partitions when below 30% (SCALE_UP_AND_DOWN only) stabilization_window=datetime.timedelta(seconds=60), ), ) Available :class:`~ydb.TopicAutoPartitioningStrategy` values: * ``SCALE_UP`` — only split partitions (add capacity). * ``SCALE_UP_AND_DOWN`` — split and merge partitions automatically. * ``DISABLED`` — static partition count. The reader handles partition splits and merges transparently when ``auto_partitioning_support=True`` (which is the default): .. code-block:: python reader = driver.topic_client.reader( "/local/my-topic", consumer="my-consumer", auto_partitioning_support=True, # default ) Custom Codecs ------------- The SDK supports RAW (no compression) and GZIP out of the box. You can register custom encode/decode functions for any codec ID: **Custom encoder (writer side):** .. code-block:: python import zlib def my_compress(data: bytes) -> bytes: return zlib.compress(data) CUSTOM_CODEC = 10001 # must be in range 10000–19999 writer = driver.topic_client.writer( "/local/my-topic", codec=CUSTOM_CODEC, encoders={CUSTOM_CODEC: my_compress}, ) **Custom decoder (reader side):** .. code-block:: python def my_decompress(data: bytes) -> bytes: return zlib.decompress(data) reader = driver.topic_client.reader( "/local/my-topic", consumer="my-consumer", decoders={CUSTOM_CODEC: my_decompress}, ) For CPU-intensive codecs you can offload encoding/decoding to a thread pool: .. code-block:: python import concurrent.futures executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) writer = driver.topic_client.writer( "/local/my-topic", codec=CUSTOM_CODEC, encoders={CUSTOM_CODEC: my_compress}, encoder_executor=executor, ) Topic Client Settings --------------------- Pass :class:`~ydb.TopicClientSettings` to the driver config to tune SDK-wide defaults: .. code-block:: python config = ydb.DriverConfig( endpoint="grpc://localhost:2136", database="/local", ) config.topic_client_settings = ydb.TopicClientSettings( encode_decode_threads_count=4, # threads used for codec operations (default: 1) ) driver = ydb.Driver(config) Complete Examples ----------------- Synchronous: Write and Read ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python import ydb endpoint = "grpc://localhost:2136" database = "/local" topic_path = "/local/my-topic" consumer = "my-consumer" with ydb.Driver(endpoint=endpoint, database=database) as driver: driver.wait(timeout=5, fail_fast=True) # Create topic (skip if already exists) try: driver.topic_client.drop_topic(topic_path) except ydb.SchemeError: pass driver.topic_client.create_topic(topic_path, consumers=[consumer]) # Write messages with driver.topic_client.writer(topic_path) as writer: for i in range(10): writer.write(f"message-{i}") # Read messages with driver.topic_client.reader(topic_path, consumer=consumer) as reader: for _ in range(10): try: message = reader.receive_message(timeout=5) print(message.data.decode()) reader.commit(message) except TimeoutError: break Asynchronous: Write and Read Concurrently ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python import asyncio import ydb endpoint = "grpc://localhost:2136" database = "/local" topic_path = "/local/my-topic" consumer = "my-consumer" async def write_messages(driver: ydb.aio.Driver): async with driver.topic_client.writer(topic_path) as writer: for i in range(10): await writer.write(ydb.TopicWriterMessage( data=f"message-{i}", metadata_items={"index": str(i)}, )) async def read_messages(driver: ydb.aio.Driver): async with driver.topic_client.reader(topic_path, consumer=consumer) as reader: while True: try: message = await asyncio.wait_for(reader.receive_message(), timeout=5) print(message.data.decode(), message.metadata_items) reader.commit(message) except asyncio.TimeoutError: return async def main(): async with ydb.aio.Driver(endpoint=endpoint, database=database) as driver: await driver.wait(timeout=5, fail_fast=True) # Create topic try: await driver.topic_client.drop_topic(topic_path) except ydb.SchemeError: pass await driver.topic_client.create_topic(topic_path, consumers=[consumer]) await asyncio.gather(write_messages(driver), read_messages(driver)) asyncio.run(main()) Table Service ============= The Table service is the legacy API for schema management and bulk data operations. Use it when you need operations that are not available through YQL — creating tables with fine-grained partitioning, bulk loading data, streaming full table scans, or managing secondary indexes programmatically. For running queries use :doc:`query` instead. The Table service does not replace the Query service; the two are complementary. The Table service is accessed through ``driver.table_client``: .. code-block:: python import ydb with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) client = driver.table_client Schema Management ----------------- Creating a Table ^^^^^^^^^^^^^^^^ Build a :class:`~ydb.TableDescription` and pass it to ``create_table``: .. code-block:: python driver.table_client.create_table( "/local/users", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("age", ydb.OptionalType(ydb.PrimitiveType.Uint32)), ) .with_primary_keys("id"), ) Primary key columns must not be wrapped in :class:`~ydb.OptionalType` — they cannot be ``NULL``. All other columns should use :class:`~ydb.OptionalType` so rows with missing values are accepted. Dropping a Table ^^^^^^^^^^^^^^^^ .. code-block:: python driver.table_client.drop_table("/local/users") Describing a Table ^^^^^^^^^^^^^^^^^^ ``describe_table`` returns a :class:`~ydb.TableSchemeEntry` with column definitions, indexes, TTL settings, and partition information: .. code-block:: python entry = driver.table_client.describe_table("/local/users") for column in entry.columns: print(column.name, column.type) for index in entry.indexes: print(index.name, index.index_columns) Pass :class:`~ydb.DescribeTableSettings` to request additional detail: .. code-block:: python settings = ydb.DescribeTableSettings().with_include_shard_key_bounds(True) entry = driver.table_client.describe_table("/local/users", settings) Altering a Table ^^^^^^^^^^^^^^^^ ``alter_table`` accepts keyword arguments for each type of modification. Pass only the arguments you need — unspecified arguments are left unchanged: **Add or remove columns:** .. code-block:: python driver.table_client.alter_table( "/local/users", add_columns=[ydb.Column("email", ydb.OptionalType(ydb.PrimitiveType.Utf8))], drop_columns=["age"], ) **Add or remove secondary indexes:** .. code-block:: python driver.table_client.alter_table( "/local/users", add_indexes=[ ydb.TableIndex("name_idx").with_index_columns("name"), ], drop_indexes=["old_idx"], ) **Modify TTL:** .. code-block:: python driver.table_client.alter_table( "/local/users", set_ttl_settings=ydb.TtlSettings().with_date_type_column("expires_at"), ) # Remove TTL driver.table_client.alter_table( "/local/users", drop_ttl_settings=ydb.DropTtl(), ) **Change partitioning settings:** .. code-block:: python driver.table_client.alter_table( "/local/events", alter_partitioning_settings=( ydb.PartitioningSettings() .with_min_partitions_count(4) .with_max_partitions_count(256) ), ) Copying and Renaming Tables ^^^^^^^^^^^^^^^^^^^^^^^^^^^ ``copy_table`` copies a single table: .. code-block:: python driver.table_client.copy_table( "/local/users", "/local/users_backup", ) ``copy_tables`` copies multiple tables atomically: .. code-block:: python driver.table_client.copy_tables([ ("/local/users", "/local/backup/users"), ("/local/orders", "/local/backup/orders"), ]) ``rename_tables`` renames (or moves) tables atomically: .. code-block:: python driver.table_client.rename_tables([ ("/local/users_new", "/local/users"), ]) If the destination already exists the call fails unless you add ``replace_destination=True`` to a ``RenameIndexItem`` — for table renames, this replaces the destination atomically. Secondary Indexes ----------------- Indexes are defined via :class:`~ydb.TableIndex` when creating or altering a table. **Global index** — built synchronously; the table is unavailable during index construction when adding to an existing table: .. code-block:: python ydb.TableIndex("name_idx").with_index_columns("name") **Global async index** — built in the background; the table stays available: .. code-block:: python ydb.TableIndex("name_idx").with_global_async_index().with_index_columns("name") **Covered index** — stores extra columns in the index so queries can be answered without a primary-key lookup: .. code-block:: python ydb.TableIndex("name_age_idx") \ .with_index_columns("name") \ .with_data_columns("age", "email") Full example — create a table with a covered index from the start: .. code-block:: python driver.table_client.create_table( "/local/users", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("email", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("age", ydb.OptionalType(ydb.PrimitiveType.Uint32)), ) .with_primary_keys("id") .with_indexes( ydb.TableIndex("name_idx") .with_index_columns("name") .with_data_columns("email"), ), ) TTL Settings ------------ YDB can automatically delete rows when a column value indicates the row has expired. Two modes are available. **Date-type column mode** — use a ``Date``, ``Datetime``, or ``Timestamp`` column whose value is the expiry time. Rows with a value in the past are deleted: .. code-block:: python ydb.TtlSettings().with_date_type_column("expires_at") # Optional: delay deletion by N seconds after expiry ydb.TtlSettings().with_date_type_column("expires_at", expire_after_seconds=3600) **Value-since-unix-epoch mode** — use an integer column that stores time as a Unix timestamp in the specified unit: .. code-block:: python ydb.TtlSettings().with_value_since_unix_epoch( "created_ts", ydb.ColumnUnit.UNIT_SECONDS, expire_after_seconds=86400, # delete rows 24 h after created_ts ) :class:`~ydb.ColumnUnit` values: ``UNIT_SECONDS``, ``UNIT_MILLISECONDS``, ``UNIT_MICROSECONDS``, ``UNIT_NANOSECONDS``. Example — create a table where rows expire 7 days after creation: .. code-block:: python driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp)), ) .with_primary_keys("id") .with_ttl( ydb.TtlSettings().with_date_type_column( "created_at", expire_after_seconds=7 * 24 * 3600, ) ), ) Partitioning Settings --------------------- :class:`~ydb.PartitioningSettings` controls how the table is split into shards and how that split evolves over time: .. code-block:: python ydb.PartitioningSettings() \ .with_min_partitions_count(4) \ .with_max_partitions_count(256) \ .with_partition_size_mb(512) \ .with_partitioning_by_load(ydb.FeatureFlag.ENABLED) \ .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) .. list-table:: :header-rows: 1 :widths: 40 60 * - Method - Effect * - ``with_min_partitions_count(n)`` - Table always has at least *n* shards. * - ``with_max_partitions_count(n)`` - Table never exceeds *n* shards. * - ``with_partition_size_mb(mb)`` - Target shard size; triggers a split when exceeded (requires by-size enabled). * - ``with_partitioning_by_size(flag)`` - Auto-split when a shard grows beyond the target size. * - ``with_partitioning_by_load(flag)`` - Auto-split hot shards and merge idle ones based on load. :class:`~ydb.FeatureFlag` values: ``ENABLED``, ``DISABLED``, ``UNSPECIFIED``. Pass ``PartitioningSettings`` to ``TableDescription.with_partitioning_settings``: .. code-block:: python driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ) .with_primary_keys("id") .with_partitioning_settings( ydb.PartitioningSettings() .with_min_partitions_count(4) .with_max_partitions_count(64) .with_partitioning_by_load(ydb.FeatureFlag.ENABLED) ), ) **Pre-split at specific keys** — useful when the key distribution is known upfront: .. code-block:: python driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ) .with_primary_keys("id") .with_partition_at_keys( ydb.ExplicitPartitions( [ydb.SplitPoint(1000), ydb.SplitPoint(2000), ydb.SplitPoint(3000)] ) ), ) Column Families --------------- Column families group columns for independent storage and compression settings. Useful when a table has both hot (frequently read) and cold (rarely read) columns: .. code-block:: python driver.table_client.create_table( "/local/users", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("bio", ydb.OptionalType(ydb.PrimitiveType.Utf8), family="blobs"), ydb.Column("avatar", ydb.OptionalType(ydb.PrimitiveType.String), family="blobs"), ) .with_primary_keys("id") .with_column_families( ydb.ColumnFamily() .with_name("blobs") .with_compression(ydb.Compression.LZ4), ), ) :class:`~ydb.Compression` values: ``NONE``, ``LZ4``, ``UNSPECIFIED``. Bulk Upsert ----------- ``bulk_upsert`` loads rows without going through a transaction. It is faster than individual ``INSERT``/``UPSERT`` statements for large batches and is idempotent (safe to retry on failure). Build a :class:`~ydb.BulkUpsertColumns` descriptor that lists the columns and their types, then pass it together with the rows: .. code-block:: python from collections import namedtuple User = namedtuple("User", ["id", "name", "age"]) rows = [ User(id=1, name="Alice", age=30), User(id=2, name="Bob", age=25), User(id=3, name="Carol", age=28), ] column_types = ( ydb.BulkUpsertColumns() .add_column("id", ydb.PrimitiveType.Uint64) .add_column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("age", ydb.OptionalType(ydb.PrimitiveType.Uint32)) ) driver.table_client.bulk_upsert("/local/users", rows, column_types) Each row must be an object (named tuple, dataclass, or any object) whose attributes match the column names in ``BulkUpsertColumns``. .. note:: ``bulk_upsert`` bypasses transaction guarantees. All rows in a single call are applied atomically, but there is no cross-call ordering. Prefer ``UPSERT`` via the Query service when transactional semantics matter. Streaming Reads --------------- read_table ^^^^^^^^^^ ``read_table`` streams all rows (or a key range) of a table without a transaction. It is efficient for full-table exports and ETL pipelines: .. code-block:: python def callee(session: ydb.table.Session): with session.read_table("/local/users", columns=("id", "name")) as it: for result_set in it: for row in result_set.rows: print(row.id, row.name) driver.table_client.retry_operation_sync(callee) ``read_table`` is called on a ``Session`` object. Access a session through ``retry_operation_sync``, which handles session lifecycle and retries. **Read a key range:** .. code-block:: python key_type = ydb.TupleType().add_element(ydb.PrimitiveType.Uint64) key_range = ydb.KeyRange( from_bound=ydb.KeyBound.inclusive([1], key_type), to_bound=ydb.KeyBound.exclusive([1000], key_type), ) def callee(session): with session.read_table( "/local/users", key_range=key_range, columns=("id", "name"), ordered=True, ) as it: for result_set in it: for row in result_set.rows: print(row.id, row.name) driver.table_client.retry_operation_sync(callee) :class:`~ydb.KeyBound`\ ``.inclusive([value], type)`` — the bound row is included. :class:`~ydb.KeyBound`\ ``.exclusive([value], type)`` — the bound row is excluded. Pass ``None`` to ``from_bound`` or ``to_bound`` of :class:`~ydb.KeyRange` to mean "from the beginning" or "to the end" of the table. **Parameters for read_table:** .. list-table:: :header-rows: 1 :widths: 25 75 * - Parameter - Description * - ``path`` - Full path to the table. * - ``key_range`` - ``KeyRange`` to read; ``None`` reads the whole table. * - ``columns`` - Tuple of column names to return; empty tuple returns all columns. * - ``ordered`` - If ``True``, rows are returned in primary-key order (slower). * - ``row_limit`` - Maximum number of rows to return. * - ``use_snapshot`` - ``True`` to read a consistent snapshot (default server behaviour). scan_query ^^^^^^^^^^ ``scan_query`` executes a YQL query in streaming mode — the server sends result chunks as they are produced without buffering the entire result set: .. code-block:: python for result_set in driver.table_client.async_scan_query( "SELECT id, name FROM users WHERE age > 25" ): for row in result_set.rows: print(row.id, row.name) Unlike ``execute_with_retries`` from the Query service, ``scan_query`` does not buffer results and cannot be used inside a transaction. Pass parameters with a ``ScanQueryParameters`` object or a plain dict: .. code-block:: python for result_set in driver.table_client.async_scan_query( "DECLARE $min_age AS Uint32; SELECT id, name FROM users WHERE age > $min_age", parameters={"$min_age": (25, ydb.PrimitiveType.Uint32)}, ): for row in result_set.rows: print(row.id, row.name) Async Usage ----------- The async table client is accessed the same way via ``driver.table_client`` on an async driver. All I/O methods become coroutines: .. code-block:: python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local" ) as driver: await driver.wait(timeout=5, fail_fast=True) # Schema operations await driver.table_client.create_table( "/local/users", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ) .with_primary_keys("id"), ) # Bulk upsert column_types = ( ydb.BulkUpsertColumns() .add_column("id", ydb.PrimitiveType.Uint64) .add_column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) ) rows = [...] await driver.table_client.bulk_upsert("/local/users", rows, column_types) # Describe entry = await driver.table_client.describe_table("/local/users") for col in entry.columns: print(col.name, col.type) await driver.table_client.drop_table("/local/users") asyncio.run(main()) Complete Example ---------------- Create a table with TTL, a secondary index, and custom partitioning; load data with ``bulk_upsert``; stream it back with ``read_table``: .. code-block:: python import ydb from collections import namedtuple Event = namedtuple("Event", ["id", "user_id", "kind", "created_at"]) with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) # Create table driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("user_id", ydb.OptionalType(ydb.PrimitiveType.Uint64)), ydb.Column("kind", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp)), ) .with_primary_keys("id") .with_indexes( ydb.TableIndex("user_idx").with_index_columns("user_id") ) .with_ttl( ydb.TtlSettings().with_date_type_column( "created_at", expire_after_seconds=30 * 24 * 3600, ) ) .with_partitioning_settings( ydb.PartitioningSettings() .with_min_partitions_count(4) .with_max_partitions_count(64) .with_partitioning_by_load(ydb.FeatureFlag.ENABLED) ), ) # Bulk load import datetime now = datetime.datetime.now(tz=datetime.timezone.utc) column_types = ( ydb.BulkUpsertColumns() .add_column("id", ydb.PrimitiveType.Uint64) .add_column("user_id", ydb.OptionalType(ydb.PrimitiveType.Uint64)) .add_column("kind", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp)) ) rows = [Event(i, i % 10, "click", now) for i in range(10000)] driver.table_client.bulk_upsert("/local/events", rows, column_types) # Stream all rows back def read_all(session): with session.read_table("/local/events", columns=("id", "user_id", "kind")) as it: for result_set in it: for row in result_set.rows: print(row.id, row.user_id, row.kind) driver.table_client.retry_operation_sync(read_all) # Clean up driver.table_client.drop_table("/local/events") Coordination Service ==================== .. warning:: Coordination Service API is experimental and may contain bugs. The interface may change in future releases. The Coordination Service provides distributed primitives for managing shared state across multiple processes or hosts: * **Semaphores** — counting semaphores that work as distributed mutexes or concurrency limiters. A semaphore with ``limit=1`` acts as an exclusive distributed lock. * **Sessions** — persistent connections to a coordination node. The server tracks session liveness; if a session drops without explicitly releasing a semaphore, the server releases it automatically after the grace period expires. A typical use case is leader election or limiting the number of concurrent workers across a cluster. Concepts -------- **Coordination node** — a named entity in the YDB schema tree (like a table path) that holds configuration and state for sessions and semaphores attached to it. Think of it as a namespace for a set of semaphores. **Session** — a stateful connection to a coordination node. Sessions survive transient network failures through automatic reconnection. When a session ends (gracefully or via timeout), all semaphores it holds are released automatically. **Semaphore** — a server-side counter with a configurable ``limit``. A call to ``acquire(count=N)`` blocks until ``N`` units are available. ``acquire()`` with the default ``count=1`` against a ``limit=1`` semaphore behaves as a mutex. Node Management --------------- Create a Node ^^^^^^^^^^^^^ A coordination node must exist before sessions can attach to it. .. code-block:: python # Minimal creation — default config driver.coordination_client.create_node("/local/my_node") Create a node with explicit configuration: .. code-block:: python import ydb.coordination config = ydb.coordination.NodeConfig( attach_consistency_mode=ydb.coordination.ConsistencyMode.STRICT, read_consistency_mode=ydb.coordination.ConsistencyMode.STRICT, rate_limiter_counters_mode=ydb.coordination.RateLimiterCountersMode.AGGREGATED, self_check_period_millis=1000, session_grace_period_millis=10000, ) driver.coordination_client.create_node("/local/my_node", config) :class:`~ydb.coordination.NodeConfig` parameters: .. list-table:: :header-rows: 1 :widths: 35 65 * - Parameter - Description * - ``attach_consistency_mode`` - :class:`~ydb.coordination.ConsistencyMode` for session attach operations. ``STRICT`` — strong consistency; ``RELAXED`` — may read slightly stale state but lower latency. * - ``read_consistency_mode`` - :class:`~ydb.coordination.ConsistencyMode` for semaphore describe/watch operations. * - ``rate_limiter_counters_mode`` - :class:`~ydb.coordination.RateLimiterCountersMode` for metrics collection: ``AGGREGATED`` (single counter per semaphore) or ``DETAILED`` (per-owner breakdown). * - ``self_check_period_millis`` - How often (ms) the server checks its own liveness. Lower values detect failures faster but increase server load. * - ``session_grace_period_millis`` - How long (ms) the server keeps a session alive after losing contact with the client before forcibly releasing all its semaphores. Describe and Alter a Node ^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python # Read current config config = driver.coordination_client.describe_node("/local/my_node") print(config.attach_consistency_mode) print(config.session_grace_period_millis) # Change config new_config = NodeConfig( attach_consistency_mode=ConsistencyMode.RELAXED, read_consistency_mode=ConsistencyMode.RELAXED, rate_limiter_counters_mode=RateLimiterCountersMode.DETAILED, self_check_period_millis=2000, session_grace_period_millis=15000, ) driver.coordination_client.alter_node("/local/my_node", new_config) Delete a Node ^^^^^^^^^^^^^ .. code-block:: python driver.coordination_client.delete_node("/local/my_node") Sessions -------- A session binds a client to a coordination node. All semaphore operations go through a session. Sessions reconnect automatically after transient network errors. .. code-block:: python # Use as context manager — session is closed on exit with driver.coordination_client.session("/local/my_node") as session: # ... work with semaphores pass # Or manage lifecycle manually session = driver.coordination_client.session("/local/my_node") try: # ... work with semaphores pass finally: session.close() If the process crashes or the network drops, the server waits ``session_grace_period_millis`` before releasing the session's semaphores. This window gives the client time to reconnect and reclaim ownership. Semaphores ---------- Basic Usage (Mutex Pattern) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ A semaphore with ``limit=1`` acts as an exclusive distributed lock: .. code-block:: python with driver.coordination_client.session("/local/my_node") as session: # Acquire returns when no other holder exists (blocks otherwise) semaphore = session.semaphore("my_lock") semaphore.acquire() try: # critical section do_work() finally: semaphore.release() The context manager form is more concise: .. code-block:: python with driver.coordination_client.session("/local/my_node") as session: with session.semaphore("my_lock"): do_work() # lock is held for the duration of the block Counting Semaphore ^^^^^^^^^^^^^^^^^^ Set ``limit > 1`` to allow multiple concurrent holders — useful for limiting the number of parallel workers: .. code-block:: python with driver.coordination_client.session("/local/my_node") as session: # Allow up to 3 concurrent holders semaphore = session.semaphore("worker_slots", limit=3) semaphore.acquire() # acquire 1 slot (default count=1) try: do_work() finally: semaphore.release() # Acquire multiple slots at once semaphore.acquire(count=2) # hold 2 of the 3 slots Inspecting a Semaphore ^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python with driver.coordination_client.session("/local/my_node") as session: semaphore = session.semaphore("my_lock") info = semaphore.describe() print("limit: ", info.limit) # max count print("count: ", info.count) # currently acquired count print("owners: ", info.owners) # list of current holders print("waiters: ", info.waiters) # list of processes waiting to acquire Attaching Metadata ^^^^^^^^^^^^^^^^^^ Each semaphore can hold an opaque ``bytes`` payload. This is useful for the leader to publish its address or other metadata so that followers can discover it: .. code-block:: python with driver.coordination_client.session("/local/my_node") as session: semaphore = session.semaphore("leader_lock") semaphore.acquire() # Publish this process's address as leader metadata semaphore.update(b"host=worker-1.internal:8080") Async Usage ----------- The async client mirrors the synchronous API with ``async with`` and ``await``: .. code-block:: python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local", ) as driver: await driver.wait(timeout=5, fail_fast=True) await driver.coordination_client.create_node("/local/my_node") async with driver.coordination_client.session("/local/my_node") as session: async with session.semaphore("my_lock"): await do_async_work() asyncio.run(main()) Async is well suited for running multiple independent workers concurrently: .. code-block:: python async def worker(client, worker_id: int): async with client.session("/local/my_node") as session: for i in range(5): async with session.semaphore("shared_resource"): print(f"worker {worker_id}: step {i}") await asyncio.sleep(0.1) async def main(): async with ydb.aio.Driver(...) as driver: await driver.wait(timeout=5, fail_fast=True) await driver.coordination_client.create_node("/local/my_node") await asyncio.gather(*(worker(driver.coordination_client, i) for i in range(4))) Multi-threaded Sync Usage -------------------------- The synchronous client is thread-safe. Multiple threads can share the same ``coordination_client`` and create independent sessions: .. code-block:: python import threading import ydb def worker(client, worker_id: int): with client.session("/local/my_node") as session: for i in range(5): with session.semaphore("shared_resource"): print(f"worker {worker_id}: step {i}") with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) driver.coordination_client.create_node("/local/my_node") threads = [ threading.Thread(target=worker, args=(driver.coordination_client, i)) for i in range(4) ] for t in threads: t.start() for t in threads: t.join() Leader Election Pattern ----------------------- A common use case is electing a leader among a set of replicas. The replica that acquires the semaphore becomes the leader; others block in ``acquire()`` and take over if the leader's session expires: .. code-block:: python import socket import ydb def run_replica(driver: ydb.Driver): my_address = socket.gethostname() with driver.coordination_client.session("/local/election") as session: semaphore = session.semaphore("leader", limit=1) while True: semaphore.acquire() # blocks until we become leader semaphore.update(my_address.encode()) # publish our address print(f"{my_address}: I am the leader") try: serve_as_leader() # run until an error or shutdown except Exception: pass finally: semaphore.release() # step down voluntarily Scheme Service ============== The Scheme service manages the YDB path hierarchy — databases, directories, tables, topics, and other named objects. It is available through ``driver.scheme_client`` on both the synchronous and asynchronous driver. A typical use case is ensuring that a directory tree exists before creating tables, or introspecting the schema at runtime. Directories ----------- Create a Directory ^^^^^^^^^^^^^^^^^^ Directories work like filesystem folders. They are used to organise tables and other objects within a database: .. code-block:: python driver.scheme_client.make_directory("/local/my_app") # Nested directories must be created one level at a time driver.scheme_client.make_directory("/local/my_app/production") driver.scheme_client.make_directory("/local/my_app/staging") A helper to ensure an entire path exists, creating each missing segment: .. code-block:: python import os import ydb def ensure_path(driver: ydb.Driver, path: str) -> None: parts = [] head = path.rstrip("/") while head: try: if driver.scheme_client.describe_path(head).is_directory_or_database(): break except ydb.SchemeError: pass parts.append(head) head = os.path.dirname(head).rstrip("/") for p in reversed(parts): driver.scheme_client.make_directory(p) Remove a Directory ^^^^^^^^^^^^^^^^^^ The directory must be empty before it can be removed: .. code-block:: python driver.scheme_client.remove_directory("/local/my_app/staging") List a Directory ^^^^^^^^^^^^^^^^ ``list_directory`` returns a :class:`~ydb.SchemeEntry` for each immediate child of the path, plus a ``self`` entry describing the path itself: .. code-block:: python listing = driver.scheme_client.list_directory("/local/my_app") # listing.self — SchemeEntry for /local/my_app itself print(listing.self.name, listing.self.type) # listing.children — list of SchemeEntry for immediate children for entry in listing.children: print(entry.name, entry.type) Filter by type to find, for example, all tables in a directory: .. code-block:: python listing = driver.scheme_client.list_directory("/local/my_app") tables = [e for e in listing.children if e.is_table()] topics = [e for e in listing.children if e.type == ydb.SchemeEntryType.TOPIC] Describing Paths ---------------- ``describe_path`` returns a :class:`~ydb.SchemeEntry` for any path — table, topic, directory, coordination node, etc.: .. code-block:: python entry = driver.scheme_client.describe_path("/local/my_app/users") print(entry.name) # "users" print(entry.owner) # owner login print(entry.type) # SchemeEntryType.TABLE print(entry.size_bytes) # approximate size SchemeEntry ----------- Every scheme object is described by a :class:`~ydb.SchemeEntry`: .. list-table:: :header-rows: 1 :widths: 25 75 * - Field - Description * - ``name`` - Object name (last path component). * - ``type`` - :class:`~ydb.SchemeEntryType` enum value. * - ``owner`` - Login of the object owner. * - ``size_bytes`` - Approximate on-disk size (tables only; 0 for directories). * - ``permissions`` - Explicit ACL entries set on this object. * - ``effective_permissions`` - Effective ACL entries (including inherited from parent). **Convenience predicates:** .. code-block:: python entry.is_directory() # DIRECTORY entry.is_database() # DATABASE entry.is_table() # TABLE (row-oriented) entry.is_column_table() # COLUMN_TABLE (column-oriented) entry.is_column_store() # COLUMN_STORE entry.is_any_table() # TABLE or COLUMN_TABLE entry.is_directory_or_database() entry.is_coordination_node() # COORDINATION_NODE entry.is_external_table() # EXTERNAL_TABLE entry.is_external_data_source() # EXTERNAL_DATA_SOURCE entry.is_view() # VIEW entry.is_resource_pool() # RESOURCE_POOL entry.is_sysview() # SYS_VIEW **SchemeEntryType enum values:** .. list-table:: :header-rows: 1 :widths: 40 60 * - Value - Meaning * - ``SchemeEntryType.DIRECTORY`` - Directory * - ``SchemeEntryType.DATABASE`` - Database root * - ``SchemeEntryType.TABLE`` - Row-oriented table * - ``SchemeEntryType.COLUMN_TABLE`` - Column-oriented table * - ``SchemeEntryType.COLUMN_STORE`` - Column store (groups column tables) * - ``SchemeEntryType.TOPIC`` - Topic (message queue) * - ``SchemeEntryType.PERS_QUEUE_GROUP`` - Legacy PersQueue group (predecessor of Topic) * - ``SchemeEntryType.COORDINATION_NODE`` - Coordination service node * - ``SchemeEntryType.EXTERNAL_TABLE`` - External table (federated query) * - ``SchemeEntryType.EXTERNAL_DATA_SOURCE`` - External data source (federated query) * - ``SchemeEntryType.VIEW`` - View * - ``SchemeEntryType.SEQUENCE`` - Sequence (auto-increment counter) * - ``SchemeEntryType.REPLICATION`` - Async replication object * - ``SchemeEntryType.TRANSFER`` - Data transfer object * - ``SchemeEntryType.RESOURCE_POOL`` - Resource pool (workload management) * - ``SchemeEntryType.SYS_VIEW`` - System view * - ``SchemeEntryType.TYPE_UNSPECIFIED`` - Unknown or unsupported type Async Usage ----------- The async scheme client has the same methods with ``await``: .. code-block:: python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local" ) as driver: await driver.wait(timeout=5, fail_fast=True) await driver.scheme_client.make_directory("/local/my_app") listing = await driver.scheme_client.list_directory("/local") for entry in listing.children: print(entry.name, entry.type) entry = await driver.scheme_client.describe_path("/local/my_app") print(entry.is_directory()) await driver.scheme_client.remove_directory("/local/my_app") asyncio.run(main()) Common Pattern: Idempotent Directory Setup ------------------------------------------ When initialising an application it is common to create the directory structure once and tolerate the case where it already exists: .. code-block:: python import ydb def setup_schema(driver: ydb.Driver, base_path: str) -> None: for subdir in ("tables", "topics", "coordination"): path = f"{base_path}/{subdir}" try: driver.scheme_client.make_directory(path) except ydb.SchemeError: pass # already exists with ydb.QuerySessionPool(driver) as pool: pool.execute_with_retries( f"CREATE TABLE IF NOT EXISTS `{base_path}/tables/events` " "(id Uint64, payload Utf8, PRIMARY KEY (id))" ) OpenTelemetry Tracing ===================== The SDK provides built-in distributed tracing via `OpenTelemetry `_. When enabled, key YDB operations — such as session creation, query execution, transaction commit/rollback, and driver initialization — produce OpenTelemetry spans. Trace context is automatically propagated to the YDB server through gRPC metadata using the `W3C Trace Context `_ standard. Tracing is **zero-cost when disabled**: the SDK uses no-op stubs by default, so there is no overhead unless you explicitly opt in. Installation ------------ OpenTelemetry packages are not included by default. Install the SDK with the ``opentelemetry`` extra: .. code-block:: sh pip install ydb[opentelemetry] This pulls in ``opentelemetry-api``. You will also need ``opentelemetry-sdk`` and an exporter for your tracing backend, for example: .. code-block:: sh # OTLP/gRPC exporter (works with Jaeger, Tempo, and others) pip install opentelemetry-exporter-otlp-proto-grpc Enabling Tracing ---------------- Call ``enable_tracing()`` once, **after** configuring your OpenTelemetry tracer provider and **before** creating a ``Driver``: .. code-block:: python from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource import ydb from ydb.opentelemetry import enable_tracing # 1. Set up OpenTelemetry resource = Resource(attributes={"service.name": "my-service"}) provider = TracerProvider(resource=resource) provider.add_span_processor( BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317")) ) trace.set_tracer_provider(provider) # 2. Enable YDB tracing enable_tracing() # 3. Use the SDK as usual — spans are created automatically with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5) with ydb.QuerySessionPool(driver) as pool: pool.execute_with_retries("SELECT 1") provider.shutdown() ``enable_tracing()`` accepts an optional ``tracer`` argument. If omitted, the SDK obtains a tracer named ``"ydb.sdk"`` from the global tracer provider. Repeated calls to ``enable_tracing()`` do nothing until you call ``disable_tracing()``, which removes hooks so you can reconfigure or turn instrumentation off. What Is Instrumented -------------------- The following operations produce spans: .. list-table:: :header-rows: 1 :widths: 35 20 45 * - Span Name - Kind - Description * - ``ydb.Driver.Initialize`` - INTERNAL - Driver wait / endpoint discovery. * - ``ydb.CreateSession`` - CLIENT - Creating a new query session. * - ``ydb.ExecuteQuery`` - CLIENT - Executing a query (including ``execute_with_retries``). * - ``ydb.BeginTransaction`` - CLIENT - Explicitly beginning a transaction via ``.begin()``. * - ``ydb.Commit`` - CLIENT - Committing an explicit transaction. * - ``ydb.Rollback`` - CLIENT - Rolling back a transaction. * - ``ydb.RunWithRetry`` - INTERNAL - Umbrella span wrapping the whole retryable block (``retry_operation_*`` / ``retry_tx_*`` / ``execute_with_retries``). * - ``ydb.Try`` - INTERNAL - A single retry attempt. From the **second** attempt onward carries ``ydb.retry.backoff_ms`` — how long the retrier slept before starting this attempt (``0`` on the skip-yield retry path: ``Aborted`` / ``BadSession`` / ``NotFound`` / ``InternalError``, where the protocol prescribes immediate retry without backoff). The very first ``ydb.Try`` omits the attribute entirely because nothing preceded it. All spans are nested under the currently active span, so wrapping your application logic in a parent span produces a complete trace tree: .. code-block:: python tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("handle-request"): pool.execute_with_retries("SELECT 1") # ↳ ydb.CreateSession (if a new session is needed) # ↳ ydb.ExecuteQuery Span Attributes --------------- Every YDB RPC (CLIENT-kind) span carries these semantic attributes: .. list-table:: :header-rows: 1 :widths: 30 70 * - Attribute - Description * - ``db.system.name`` - Always ``"ydb"``. * - ``db.namespace`` - Database path (e.g. ``"/local"``). * - ``server.address`` - Host from the connection string. * - ``server.port`` - Port from the connection string. * - ``network.peer.address`` - Actual node host from the discovery endpoint map (set once the session is attached to a node). * - ``network.peer.port`` - Actual node port from the discovery endpoint map. * - ``ydb.node.dc`` - Data-center / location reported by discovery for the node (e.g. ``"vla"``, ``"sas"``). Additional attributes are set when available: .. list-table:: :header-rows: 1 :widths: 30 70 * - Attribute - Description * - ``ydb.node.id`` - YDB node that handled the request. On errors, the span also records: - ``error.type`` — ``"ydb_error"``, ``"transport_error"``, or the Python exception class name. - ``db.response.status_code`` — the YDB status code name (e.g. ``"SCHEME_ERROR"``). Trace Context Propagation ------------------------- When tracing is enabled, the SDK automatically injects trace context headers into every gRPC call to YDB using the globally configured OpenTelemetry propagator (``opentelemetry.propagate.inject``). By default, OpenTelemetry uses the `W3C Trace Context `_ propagator, which adds ``traceparent`` and ``tracestate`` headers. YDB server expects W3C Trace Context headers, so the default propagator configuration works out of the box. This allows the server to correlate client spans with server-side processing, enabling end-to-end trace visibility across the entire request path. Async Usage ----------- Tracing works identically with the async driver. Call ``enable_tracing()`` once at startup: .. code-block:: python import asyncio import ydb from ydb.opentelemetry import enable_tracing enable_tracing() async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local", ) as driver: await driver.wait(timeout=5) async with ydb.aio.QuerySessionPool(driver) as pool: await pool.execute_with_retries("SELECT 1") asyncio.run(main()) Using a Custom Tracer --------------------- To use a specific tracer instead of the global one: .. code-block:: python from opentelemetry import trace my_tracer = trace.get_tracer("my.custom.tracer") enable_tracing(tracer=my_tracer) Running the Examples -------------------- The runnable script is ``examples/opentelemetry/otel_example.py`` (bank table + concurrent Serializable transactions and ``app_startup`` / ``example_tli`` application spans). **Start Docker (YDB or the full stack) first**, then install and run on the host — see ``examples/opentelemetry/README.md`` for the full order of commands and environment variables. **Full stack in one command** (YDB + OTLP + Tempo + Grafana; the ``otel-example`` service is built from ``examples/opentelemetry/Dockerfile`` and runs the script once): .. code-block:: sh docker compose -f examples/opentelemetry/compose-e2e.yaml up --build The first run builds the ``otel-example`` image from the local SDK source; subsequent runs reuse the cached image. Pass ``--build`` again if you change the SDK or the demo script. **Typical local run** (YDB in Docker, script on the host — Compose **before** ``pip`` / ``python``): .. code-block:: sh docker compose up -d pip install -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt python examples/opentelemetry/otel_example.py Open `http://localhost:3000 `_ (Grafana) to explore traces via Tempo. YDB Types ========= YDB has its own type system. When passing parameters to queries you can let the SDK infer the YDB type from the Python value automatically, or declare the type explicitly when you need precise control. Implicit Type Mapping --------------------- For most common cases, pass a plain Python value and the SDK will pick the right type: .. list-table:: :header-rows: 1 :widths: 30 30 40 * - Python value - YDB type inferred - Notes * - ``bool`` - ``Bool`` - * - ``int`` - ``Int64`` - * - ``float`` - ``Double`` - * - ``str`` - ``Utf8`` - * - ``bytes`` - ``String`` - * - ``dict`` - ``Json`` - serialised to JSON text * - ``list`` / ``tuple`` - ``Json`` - serialised to JSON text .. code-block:: python pool.execute_with_retries( "DECLARE $name AS Utf8; SELECT * FROM users WHERE name = $name", parameters={"$name": "Alice"}, # str → Utf8 ) pool.execute_with_retries( "DECLARE $active AS Bool; SELECT * FROM users WHERE active = $active", parameters={"$active": True}, # bool → Bool ) Explicit Types -------------- When the automatic mapping is ambiguous or you need a type that cannot be inferred, declare the type explicitly using one of three forms. Tuple form ``(value, type)`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python parameters={ "$id": (42, ydb.PrimitiveType.Uint64), } ``ydb.TypedValue`` ^^^^^^^^^^^^^^^^^^ .. code-block:: python parameters={ "$id": ydb.TypedValue(42, ydb.PrimitiveType.Uint64), } Both forms are equivalent. The tuple form is more concise; :class:`~ydb.TypedValue` is more explicit and works well with type checkers. PrimitiveType Reference ----------------------- Integers ^^^^^^^^ .. list-table:: :header-rows: 1 :widths: 30 20 50 * - Type - Range - Notes * - ``PrimitiveType.Bool`` - true / false - * - ``PrimitiveType.Int8`` - −128 … 127 - * - ``PrimitiveType.Uint8`` - 0 … 255 - * - ``PrimitiveType.Int16`` - −32 768 … 32 767 - * - ``PrimitiveType.Uint16`` - 0 … 65 535 - * - ``PrimitiveType.Int32`` - −2³¹ … 2³¹−1 - * - ``PrimitiveType.Uint32`` - 0 … 2³²−1 - * - ``PrimitiveType.Int64`` - −2⁶³ … 2⁶³−1 - default for ``int`` * - ``PrimitiveType.Uint64`` - 0 … 2⁶⁴−1 - Floating point ^^^^^^^^^^^^^^ .. list-table:: :header-rows: 1 :widths: 30 70 * - Type - Notes * - ``PrimitiveType.Float`` - 32-bit IEEE 754 * - ``PrimitiveType.Double`` - 64-bit IEEE 754; default for Python ``float`` Text and binary ^^^^^^^^^^^^^^^ .. list-table:: :header-rows: 1 :widths: 30 70 * - Type - Notes * - ``PrimitiveType.Utf8`` - UTF-8 encoded text; default for Python ``str`` * - ``PrimitiveType.String`` - arbitrary bytes; default for Python ``bytes`` * - ``PrimitiveType.Json`` - JSON text stored as ``Utf8`` * - ``PrimitiveType.JsonDocument`` - JSON stored in a binary representation optimised for field access * - ``PrimitiveType.Yson`` - YSON binary/text format (YDB-specific) * - ``PrimitiveType.DyNumber`` - arbitrary-precision decimal stored as text Date and time ^^^^^^^^^^^^^ .. list-table:: :header-rows: 1 :widths: 30 70 * - Type - Notes * - ``PrimitiveType.Date`` - calendar date; maps to/from ``datetime.date`` * - ``PrimitiveType.Datetime`` - date + time without timezone (second precision) * - ``PrimitiveType.Timestamp`` - date + time without timezone (microsecond precision); maps to/from ``datetime.datetime`` * - ``PrimitiveType.Interval`` - duration; maps to/from ``datetime.timedelta`` * - ``PrimitiveType.Date32`` - extended-range date (supports negative years) * - ``PrimitiveType.Datetime64`` - extended-range datetime (second precision) * - ``PrimitiveType.Timestamp64`` - extended-range timestamp (microsecond precision) * - ``PrimitiveType.Interval64`` - extended-range interval Other ^^^^^ .. list-table:: :header-rows: 1 :widths: 30 70 * - Type - Notes * - ``PrimitiveType.UUID`` - UUID stored as two 64-bit integers; maps to/from ``uuid.UUID`` .. code-block:: python import datetime import ydb pool.execute_with_retries( "DECLARE $ts AS Timestamp; SELECT * FROM events WHERE created_at > $ts", parameters={ "$ts": ydb.TypedValue( datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc), ydb.PrimitiveType.Timestamp, ) }, ) Collection Types ---------------- OptionalType ^^^^^^^^^^^^ Wraps any type to allow ``None`` values. Use when a parameter or column is nullable: .. code-block:: python # Nullable Utf8 — pass None or a string ydb.OptionalType(ydb.PrimitiveType.Utf8) pool.execute_with_retries( "DECLARE $nickname AS Utf8?; " "INSERT INTO users (id, nickname) VALUES (1, $nickname)", parameters={"$nickname": (None, ydb.OptionalType(ydb.PrimitiveType.Utf8))}, ) ListType ^^^^^^^^ A homogeneous ordered sequence: .. code-block:: python ydb.ListType(ydb.PrimitiveType.Int64) pool.execute_with_retries( "DECLARE $ids AS List; " "SELECT * FROM users WHERE id IN $ids", parameters={"$ids": ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64))}, ) DictType ^^^^^^^^ A map from a key type to a value type: .. code-block:: python # Dict ydb.DictType(ydb.PrimitiveType.Utf8, ydb.PrimitiveType.Int64) parameters={ "$scores": ( {"alice": 10, "bob": 20}, ydb.DictType(ydb.PrimitiveType.Utf8, ydb.PrimitiveType.Int64), ) } StructType ^^^^^^^^^^ A named record — useful for passing rows as parameters: .. code-block:: python row_type = ( ydb.StructType() .add_member("id", ydb.PrimitiveType.Uint64) .add_member("name", ydb.PrimitiveType.Utf8) ) pool.execute_with_retries( "DECLARE $row AS Struct; " "INSERT INTO users (id, name) VALUES ($row.id, $row.name)", parameters={ "$row": ({"id": 1, "name": "Alice"}, row_type) }, ) Combine :class:`~ydb.ListType` + :class:`~ydb.StructType` for bulk inserts: .. code-block:: python row_type = ( ydb.StructType() .add_member("id", ydb.PrimitiveType.Uint64) .add_member("name", ydb.PrimitiveType.Utf8) ) rows_type = ydb.ListType(row_type) rows = [ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, ] pool.execute_with_retries( "DECLARE $rows AS List>; " "INSERT INTO users SELECT id, name FROM AS_TABLE($rows)", parameters={"$rows": (rows, rows_type)}, ) TupleType ^^^^^^^^^ A positional (unnamed) record. Build it by chaining ``add_element``: .. code-block:: python point_type = ( ydb.TupleType() .add_element(ydb.PrimitiveType.Double) # x .add_element(ydb.PrimitiveType.Double) # y ) parameters={"$point": ([1.0, 2.0], point_type)} DecimalType ^^^^^^^^^^^ Fixed-precision decimal number: .. code-block:: python # Decimal(22, 9) — 22 total digits, 9 after the decimal point (default) ydb.DecimalType() # Custom precision ydb.DecimalType(precision=10, scale=2) Reading Values from Result Sets -------------------------------- When you read from a result set, the SDK converts YDB values back to Python objects: .. list-table:: :header-rows: 1 :widths: 30 70 * - YDB type - Python value returned * - ``Bool`` - ``bool`` * - ``Int*/Uint*`` - ``int`` * - ``Float`` / ``Double`` - ``float`` * - ``Utf8``, ``Json``, ``JsonDocument`` - ``str`` * - ``String``, ``Yson`` - ``bytes`` * - ``Date`` - ``datetime.date`` * - ``Datetime`` / ``Datetime64`` - ``datetime.datetime`` * - ``Timestamp`` / ``Timestamp64`` - ``datetime.datetime`` * - ``Interval`` / ``Interval64`` - ``datetime.timedelta`` * - ``UUID`` - ``str`` (formatted as ``xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx``) * - ``Decimal`` - ``decimal.Decimal`` * - ``Optional`` - the inner value, or ``None`` .. code-block:: python result_sets = pool.execute_with_retries( "SELECT id, name, created_at, score FROM users WHERE id = 1" ) row = result_sets[0].rows[0] row["id"] # int row["name"] # str (Utf8 column) row["created_at"] # datetime.datetime (Timestamp column) row["score"] # float (Double column) .. note:: ``String`` columns return ``bytes``, not ``str``. If you store text in a ``String`` column, decode it explicitly: ``row["col"].decode("utf-8")``. Prefer ``Utf8`` for text data. Errors and Retries ================== The SDK maps every YDB server status code to a Python exception class. All exceptions inherit from :class:`~ydb.Error`, so you can catch the whole hierarchy with a single ``except ydb.Error`` clause, or handle individual error types precisely. Exception Hierarchy ------------------- .. code-block:: text ydb.Error ├── ydb.BadRequest — malformed query or invalid argument ├── ydb.Unauthorized — authentication succeeded but operation not permitted ├── ydb.Unauthenticated — authentication failed ├── ydb.InternalError — server-side internal error (usually transient) ├── ydb.Aborted — transaction aborted due to a conflict; safe to retry ├── ydb.Unavailable — service temporarily unavailable; safe to retry ├── ydb.Overloaded — server is overloaded; retry with backoff ├── ydb.SchemeError — schema-related error (e.g. table not found, already exists) ├── ydb.GenericError — unclassified server error ├── ydb.Timeout — server-side deadline exceeded ├── ydb.BadSession — session is invalid or expired; create a new one ├── ydb.PreconditionFailed — operation precondition not met ├── ydb.AlreadyExists — object already exists (DDL) ├── ydb.NotFound — object not found ├── ydb.SessionExpired — session TTL expired ├── ydb.Cancelled — operation was cancelled ├── ydb.Undetermined — outcome is unknown (e.g. network lost before ack) ├── ydb.Unsupported — operation not supported by this server version ├── ydb.SessionBusy — session is executing another request ├── ydb.ExternalError — error in an external data source ├── ydb.TruncatedResponseError — result set was truncated by the server ├── ydb.SessionPoolEmpty — all sessions are busy; pool exhausted ├── ydb.SessionPoolClosed — session pool has been stopped ├── ydb.ConnectionError — base class for transport-level errors │ ├── ydb.ConnectionFailure — could not establish connection │ ├── ydb.ConnectionLost — connection dropped mid-request │ └── ydb.Unimplemented — server does not support this RPC └── ydb.DeadlineExceed — client-side deadline exceeded Catching Errors --------------- .. code-block:: python import ydb try: pool.execute_with_retries("SELECT * FROM users") except ydb.SchemeError: print("table does not exist") except ydb.Unauthorized: print("access denied") except ydb.Unavailable: print("service temporarily unavailable") except ydb.Error as e: print(f"other YDB error: {e}") Each exception exposes: * ``str(e)`` — human-readable message from the server. * ``e.issues`` — list of structured ``IssueMessage`` objects with ``message``, ``issue_code``, and ``severity``. * ``e.status`` — the ``ydb.StatusCode`` enum value. .. code-block:: python try: pool.execute_with_retries("BAD QUERY") except ydb.Error as e: print(e.status) # e.g. StatusCode.BAD_REQUEST print(e.message) if e.issues: for issue in e.issues: print(issue.message, issue.severity) Retriable vs Non-Retriable Errors ---------------------------------- Not every error is worth retrying. The SDK classifies errors into three groups: **Always retriable (fast backoff):** * :class:`~ydb.Unavailable` — service is temporarily unavailable. * ``ClientInternalError`` — internal SDK error. * ``SessionExpired`` — session TTL passed; the SDK opens a new one. * :class:`~ydb.NotFound` — by default retried (configurable via ``retry_not_found``). * ``Cancelled`` — only when ``retry_cancelled=True`` is set. **Retriable with slow backoff:** * :class:`~ydb.Aborted` — transaction conflict; the whole transaction must be replayed. * :class:`~ydb.BadSession` — session is invalid; the SDK acquires a new one. * :class:`~ydb.Overloaded` — server under heavy load; back off and try again. * :class:`~ydb.SessionPoolEmpty` — all pool sessions busy; wait and retry. * :class:`~ydb.ConnectionError` / :class:`~ydb.ConnectionLost` — network issues; reconnect and retry. **Retriable only for idempotent operations (slow backoff):** * :class:`~ydb.Undetermined` — outcome unknown; only safe to retry if the operation is idempotent (i.e. repeating it has the same effect as running it once). Set ``idempotent=True`` in :class:`~ydb.RetrySettings` to enable. **Never retried:** * :class:`~ydb.BadRequest`, :class:`~ydb.Unauthorized`, ``Unauthenticated``, :class:`~ydb.SchemeError`, :class:`~ydb.AlreadyExists`, ``Unsupported``, :class:`~ydb.Timeout`, ``PreconditionFailed``, ``ExternalError`` — these indicate a problem with the query, credentials, or schema that won't resolve by retrying. RetrySettings ------------- All pool methods that perform retries (``execute_with_retries``, ``retry_operation_sync``, ``retry_tx_sync``) accept an optional :class:`~ydb.RetrySettings` object: .. code-block:: python import ydb retry = ydb.RetrySettings( max_retries=5, # max retry attempts (default: 10) idempotent=False, # set True to also retry Undetermined errors retry_cancelled=False, # set True to retry Cancelled errors ) pool.execute_with_retries("SELECT 1", retry_settings=retry) BackoffSettings ^^^^^^^^^^^^^^^ :class:`~ydb.RetrySettings` uses two separate backoff curves: * **fast backoff** — used for errors expected to clear quickly (:class:`~ydb.Unavailable`, ``SessionExpired``, etc.). * **slow backoff** — used for errors where the server needs more breathing room (:class:`~ydb.Overloaded`, :class:`~ydb.Aborted`, connection failures, etc.). Both are instances of :class:`~ydb.BackoffSettings`: .. code-block:: python fast = ydb.BackoffSettings( ceiling=10, # exponent cap: max wait slot = 2^ceiling * slot_duration slot_duration=0.005, # base time unit in seconds (default: 5 ms) uncertain_ratio=0.5, # fraction of the window that is randomised (jitter) ) slow = ydb.BackoffSettings( ceiling=6, slot_duration=1.0, # 1 second base (default) uncertain_ratio=0.5, ) retry = ydb.RetrySettings( max_retries=10, fast_backoff_settings=fast, slow_backoff_settings=slow, ) The actual sleep duration for retry ``n`` is: .. code-block:: text slots = 2 ^ min(n, ceiling) max_ms = slots * slot_duration * 1000 sleep = max_ms * (random() * uncertain_ratio + (1 - uncertain_ratio)) / 1000 Error Callback ^^^^^^^^^^^^^^ To log or instrument every retry attempt, pass ``on_ydb_error_callback``: .. code-block:: python import logging logger = logging.getLogger(__name__) def log_retry(err: ydb.Error): logger.warning("YDB error, will retry: %s", err) retry = ydb.RetrySettings( max_retries=10, on_ydb_error_callback=log_retry, ) pool.execute_with_retries("SELECT 1", retry_settings=retry) ``@ydb_retry`` Decorator ------------------------ For functions that run outside a session pool, use the ``@ydb.ydb_retry`` decorator. It wraps both synchronous and asynchronous functions: .. code-block:: python import ydb @ydb.ydb_retry(max_retries=5, idempotent=True) def fetch_user(driver: ydb.Driver, user_id: int): with ydb.QuerySessionPool(driver) as pool: result = pool.execute_with_retries( "SELECT name FROM users WHERE id = $id", parameters={"$id": user_id}, ) return result[0].rows[0]["name"] # Async version — the decorator detects coroutines automatically: @ydb.ydb_retry(max_retries=5, idempotent=True) async def fetch_user_async(driver: ydb.aio.Driver, user_id: int): async with ydb.aio.QuerySessionPool(driver) as pool: result = await pool.execute_with_retries( "SELECT name FROM users WHERE id = $id", parameters={"$id": user_id}, ) return result[0].rows[0]["name"] .. note:: The decorator retries the **entire function** on failure, not just the individual query. Only use ``idempotent=True`` when repeating the full function body is safe. Handling ``Undetermined`` -------------------------- :class:`~ydb.Undetermined` means the network was lost before the server could confirm whether the operation succeeded or failed. The server may have applied the write — or not. For **read-only** queries this is always safe to retry. For **writes**, only retry if you can tolerate duplicates or the query is naturally idempotent (e.g. ``UPSERT``): .. code-block:: python retry = ydb.RetrySettings( max_retries=10, idempotent=True, # enables retry on Undetermined ) # Safe: UPSERT is idempotent pool.execute_with_retries( "UPSERT INTO events (id, data) VALUES (42, 'payload')", retry_settings=retry, ) # Unsafe: INSERT will fail with AlreadyExists on the second attempt — # but that error is not retriable, so at worst you get an exception. pool.execute_with_retries( "INSERT INTO events (id, data) VALUES (42, 'payload')", retry_settings=retry, ) Common Patterns --------------- Fail fast on connection errors during startup: .. code-block:: python try: driver.wait(timeout=5, fail_fast=True) except TimeoutError: raise SystemExit("Could not reach YDB — check endpoint and credentials") Distinguish schema errors (fix the code) from transient errors (retry): .. code-block:: python try: pool.execute_with_retries("SELECT * FROM nonexistent_table") except ydb.SchemeError as e: raise RuntimeError(f"Schema problem: {e}") from e except ydb.Error: pass # handled by retry logic inside execute_with_retries YDB API Reference ================= .. toctree:: :caption: Contents: .. module:: ydb Driver ------ DriverConfig ^^^^^^^^^^^^ .. autoclass:: ydb.DriverConfig :members: :inherited-members: :undoc-members: :exclude-members: database, ca_cert, channel_options, secure_channel, endpoint, endpoints, credentials, use_all_nodes, root_certificates, certificate_chain, private_key, grpc_keep_alive_timeout, table_client_settings, primary_user_agent Driver ^^^^^^ .. autoclass:: ydb.Driver :members: :inherited-members: :undoc-members: Driver (AsyncIO) ^^^^^^^^^^^^^^^^ .. autoclass:: ydb.aio.Driver :members: :inherited-members: :undoc-members: ------------------------ Credentials ----------- AnonymousCredentials ^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.AnonymousCredentials :members: :undoc-members: AccessTokenCredentials ^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.AccessTokenCredentials :members: :undoc-members: StaticCredentials ^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.StaticCredentials :members: :undoc-members: ------------------------ Common ------ BaseRequestSettings ^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.BaseRequestSettings :members: :inherited-members: :undoc-members: :exclude-members: trace_id, request_type, timeout, cancel_after, operation_timeout, compression, need_rpc_auth, headers, make_copy, tracer RetrySettings ^^^^^^^^^^^^^ .. autoclass:: ydb.RetrySettings :members: :inherited-members: :undoc-members: BackoffSettings ^^^^^^^^^^^^^^^ .. autoclass:: ydb.BackoffSettings :members: :undoc-members: Result Sets ^^^^^^^^^^^ .. autoclass:: ydb.convert._ResultSet :members: :inherited-members: :undoc-members: ------------------------ Types ----- PrimitiveType ^^^^^^^^^^^^^ .. autoclass:: ydb.PrimitiveType :members: :undoc-members: TypedValue ^^^^^^^^^^ .. autoclass:: ydb.TypedValue :members: :undoc-members: OptionalType ^^^^^^^^^^^^ .. autoclass:: ydb.OptionalType :members: :undoc-members: ListType ^^^^^^^^ .. autoclass:: ydb.ListType :members: :undoc-members: DictType ^^^^^^^^ .. autoclass:: ydb.DictType :members: :undoc-members: StructType ^^^^^^^^^^ .. autoclass:: ydb.StructType :members: :undoc-members: TupleType ^^^^^^^^^ .. autoclass:: ydb.TupleType :members: :undoc-members: DecimalType ^^^^^^^^^^^ .. autoclass:: ydb.DecimalType :members: :undoc-members: ------------------------ Query Service ------------- QueryClientSettings ^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.QueryClientSettings :members: :inherited-members: :undoc-members: QuerySessionPool ^^^^^^^^^^^^^^^^ .. autoclass:: ydb.QuerySessionPool :members: :inherited-members: :undoc-members: QuerySession ^^^^^^^^^^^^ .. autoclass:: ydb.QuerySession :members: :inherited-members: :undoc-members: QueryTxContext ^^^^^^^^^^^^^^ .. autoclass:: ydb.QueryTxContext :members: :inherited-members: :undoc-members: QuerySessionPool (AsyncIO) ^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.aio.QuerySessionPool :members: :inherited-members: :undoc-members: QuerySession (AsyncIO) ^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.aio.QuerySession :members: :inherited-members: :undoc-members: QueryTxContext (AsyncIO) ^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.aio.QueryTxContext :members: :inherited-members: :undoc-members: Transaction Modes ^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.BaseQueryTxMode :members: :inherited-members: :undoc-members: :exclude-members: name, to_proto .. autoclass:: ydb.QuerySerializableReadWrite :members: :inherited-members: :undoc-members: :exclude-members: name, to_proto .. autoclass:: ydb.QuerySnapshotReadOnly :members: :inherited-members: :undoc-members: :exclude-members: name, to_proto .. autoclass:: ydb.QuerySnapshotReadWrite :members: :inherited-members: :undoc-members: :exclude-members: name, to_proto .. autoclass:: ydb.QueryOnlineReadOnly :members: :inherited-members: :undoc-members: :exclude-members: name, to_proto .. autoclass:: ydb.QueryStaleReadOnly :members: :inherited-members: :undoc-members: :exclude-members: name, to_proto Query Settings ^^^^^^^^^^^^^^ .. autoclass:: ydb.QueryStatsMode :members: :undoc-members: .. autoclass:: ydb.QueryResultSetFormat :members: :undoc-members: .. autoclass:: ydb.QueryExplainResultFormat :members: :undoc-members: .. autoclass:: ydb.ArrowFormatSettings :members: :undoc-members: ------------------------ Table Service ------------- TableClient ^^^^^^^^^^^ .. autoclass:: ydb.TableClient :members: :inherited-members: :undoc-members: TableClientSettings ^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.TableClientSettings :members: :inherited-members: :undoc-members: Session Pool ^^^^^^^^^^^^ .. autoclass:: ydb.SessionPool :members: :inherited-members: :undoc-members: Session ^^^^^^^ .. autoclass:: ydb.Session :members: :inherited-members: :undoc-members: Transaction Context ^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.TxContext :members: :inherited-members: :undoc-members: DataQuery ^^^^^^^^^ .. autoclass:: ydb.DataQuery :members: :inherited-members: :undoc-members: TableDescription ^^^^^^^^^^^^^^^^ .. autoclass:: ydb.TableDescription :members: :undoc-members: Column ^^^^^^ .. autoclass:: ydb.Column :members: :undoc-members: TableIndex ^^^^^^^^^^ .. autoclass:: ydb.TableIndex :members: :undoc-members: TableSchemeEntry ^^^^^^^^^^^^^^^^ .. autoclass:: ydb.TableSchemeEntry :members: :undoc-members: DescribeTableSettings ^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.DescribeTableSettings :members: :undoc-members: KeyRange ^^^^^^^^ .. autoclass:: ydb.KeyRange :members: :undoc-members: KeyBound ^^^^^^^^ .. autoclass:: ydb.KeyBound :members: :undoc-members: BulkUpsertColumns ^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.BulkUpsertColumns :members: :undoc-members: TtlSettings ^^^^^^^^^^^ .. autoclass:: ydb.TtlSettings :members: :undoc-members: PartitioningSettings ^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.PartitioningSettings :members: :undoc-members: ColumnFamily ^^^^^^^^^^^^ .. autoclass:: ydb.ColumnFamily :members: :undoc-members: FeatureFlag ^^^^^^^^^^^ .. autoclass:: ydb.FeatureFlag :members: :undoc-members: ColumnUnit ^^^^^^^^^^ .. autoclass:: ydb.ColumnUnit :members: :undoc-members: Compression ^^^^^^^^^^^ .. autoclass:: ydb.Compression :members: :undoc-members: -------------------------- Topic Service ------------- TopicClient ^^^^^^^^^^^ .. autoclass:: ydb.TopicClient :members: :inherited-members: :undoc-members: TopicClientSettings ^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.TopicClientSettings :members: :inherited-members: :undoc-members: TopicConsumer ^^^^^^^^^^^^^ .. autoclass:: ydb.TopicConsumer :members: :undoc-members: TopicCodec ^^^^^^^^^^ .. autoclass:: ydb.TopicCodec :members: :undoc-members: TopicWriterMessage ^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.TopicWriterMessage :members: :undoc-members: TopicReaderSelector ^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.TopicReaderSelector :members: :undoc-members: TopicAutoPartitioningSettings ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.TopicAutoPartitioningSettings :members: :undoc-members: TopicAutoPartitioningStrategy ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.TopicAutoPartitioningStrategy :members: :undoc-members: -------------------------- Coordination Service -------------------- NodeConfig ^^^^^^^^^^ .. autoclass:: ydb.coordination.NodeConfig :members: :undoc-members: ConsistencyMode ^^^^^^^^^^^^^^^ .. autoclass:: ydb.coordination.ConsistencyMode :members: :undoc-members: RateLimiterCountersMode ^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: ydb.coordination.RateLimiterCountersMode :members: :undoc-members: -------------------------- Scheme ------ SchemeClient ^^^^^^^^^^^^ .. autoclass:: ydb.SchemeClient :members: :inherited-members: :undoc-members: SchemeEntry ^^^^^^^^^^^ .. autoclass:: ydb.SchemeEntry :members: :undoc-members: SchemeEntryType ^^^^^^^^^^^^^^^ .. autoclass:: ydb.SchemeEntryType :members: :undoc-members: -------------------------- Errors ------ .. autoclass:: ydb.Error :members: :undoc-members: .. autoclass:: ydb.BadRequest :undoc-members: .. autoclass:: ydb.Unauthorized :undoc-members: .. autoclass:: ydb.Unauthenticated :undoc-members: .. autoclass:: ydb.Aborted :undoc-members: .. autoclass:: ydb.Unavailable :undoc-members: .. autoclass:: ydb.Overloaded :undoc-members: .. autoclass:: ydb.SchemeError :undoc-members: .. autoclass:: ydb.GenericError :undoc-members: .. autoclass:: ydb.Timeout :undoc-members: .. autoclass:: ydb.BadSession :undoc-members: .. autoclass:: ydb.AlreadyExists :undoc-members: .. autoclass:: ydb.NotFound :undoc-members: .. autoclass:: ydb.Undetermined :undoc-members: .. autoclass:: ydb.ConnectionError :undoc-members: .. autoclass:: ydb.ConnectionFailure :undoc-members: .. autoclass:: ydb.ConnectionLost :undoc-members: .. autoclass:: ydb.SessionPoolEmpty :undoc-members: .. autoclass:: ydb.DeadlineExceed :undoc-members: