YDB Python SDK
==============
Python client for `YDB `_ — a fault-tolerant distributed SQL database.
.. toctree::
:hidden:
:caption: Getting Started
overview
quickstart
.. toctree::
:hidden:
:caption: Connecting
driver
.. toctree::
:hidden:
:caption: Services
query
topic
table
coordination
scheme
.. toctree::
:hidden:
:caption: Observability
opentelemetry
.. toctree::
:hidden:
:caption: Reference
types
errors
apireference
New to the SDK?
---------------
Start with :doc:`quickstart` — it shows how to install the package, connect to a
database, and run your first query in under five minutes.
Then read :doc:`driver` to understand how to configure the connection: which endpoint
and database to use, how to authenticate, and how to set up TLS. Every service in the
SDK is accessed through a ``Driver`` instance, so this page is the foundation for
everything else.
LLM-ready Documentation
-----------------------
For AI-assisted coding, use `llms.txt `_
as the documentation index and
`llms-full.txt `_
as the complete documentation context.
Running Queries
---------------
The :doc:`query` page covers the Query service — the primary API for executing YQL
statements. It explains ``QuerySessionPool``, how to run DDL and DML, how to pass
parameters safely, and how to work with transactions. Start here if you want to read
or write data.
The :doc:`types` page is a companion to query: it shows how Python values map to YDB
types and how to specify types explicitly when the automatic mapping is not enough.
Messaging
---------
The :doc:`topic` page covers the Topic service — a persistent message queue similar to
Kafka. It explains how to create topics, write messages, read and commit them, and how
topics integrate with transactions. Both synchronous and asynchronous patterns are
covered.
Table Service
-------------
The :doc:`table` page covers ``driver.table_client`` — the lower-level API for
operations that cannot be expressed in YQL: creating tables with custom partitioning,
TTL, secondary indexes, and column families; bulk loading data with ``bulk_upsert``;
and streaming full-table reads with ``read_table`` or ``scan_query``. Use this
alongside the Query service when you need fine-grained schema or data-loading control.
Distributed Coordination
------------------------
The :doc:`coordination` page covers distributed semaphores and leader election. If you
need to limit concurrent access to a shared resource across multiple processes or hosts,
this is the service to use.
Schema Management
-----------------
The :doc:`scheme` page covers ``driver.scheme_client`` — creating and removing
directories, listing directory contents, and describing any path in the YDB hierarchy
(tables, topics, coordination nodes, etc.).
Error Handling and Retries
--------------------------
The :doc:`errors` page is important reading before going to production. YDB returns
structured error codes, and the SDK's retry logic depends on them. This page explains
which errors are safe to retry, which are not, how to tune backoff settings, and how to
use the ``@ydb_retry`` decorator. Skipping this section is a common source of production
incidents.
Observability
-------------
The :doc:`opentelemetry` page explains how to add distributed tracing to your
application using OpenTelemetry. One call to ``enable_tracing()`` instruments
query sessions, transactions, and connection pool operations — so you can
visualize request flow in Jaeger, Grafana, or any OpenTelemetry-compatible backend.
API Reference
-------------
The :doc:`apireference` page contains auto-generated documentation for all public
classes and methods.
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
Overview
========
Project Homepage
----------------
YDB Python SDK is hosted on GitHub at https://github.com/ydb-platform/ydb-python-sdk under the ydb-platform organization.
Releases and project status are available on Pypi at https://pypi.org/project/ydb.
The most recent published version of this documentation should be at https://ydb-platform.github.io/ydb-python-sdk.
Community
---------
You can ask your questions in official Telegram chats: `EN `_ | `RU `_.
Bugs and feature enhancements to YDB Python SDK should be reported on the `GitHub
issue tracker
`_.
Quick Start
===========
Installation
------------
Prerequisites
^^^^^^^^^^^^^
* Python 3.8 or higher;
* ``pip`` version 9.0.1 or higher;
If necessary, upgrade your version of ``pip``::
python -m pip install --upgrade pip
If you cannot upgrade `pip` due to a system-owned installation, you can run the example in a virtualenv::
python -m pip install virtualenv
virtualenv venv
source venv/bin/activate
python -m pip install --upgrade pip
Installation via Pypi
^^^^^^^^^^^^^^^^^^^^^
To install YDB Python SDK through Pypi execute the following command::
pip install ydb
Installation From Sources
^^^^^^^^^^^^^^^^^^^^^^^^^
To install YDB Python SDK from sources execute the following command from the root of repository::
pip install -e .
Usage
-----
Import Package
^^^^^^^^^^^^^^
.. code-block:: python
import ydb
Driver Initialization
^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
endpoint = "grpc://localhost:2136" # your ydb endpoint
database = "/local" # your ydb database
with ydb.Driver(
endpoint=endpoint,
database=database,
credentials=ydb.credentials_from_env_variables(),
) as driver:
driver.wait(timeout=5, fail_fast=True)
SessionPool Initialization
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
with ydb.QuerySessionPool(driver) as pool:
pass
Query Execution
^^^^^^^^^^^^^^^
Python SDK supports queries described by YQL syntax.
There are two primary methods for executing queries, each with different properties and use cases:
* ``pool.execute_with_retries``:
* Buffers the entire result set in client memory.
* Automatically retries execution in case of retriable issues.
* Does not allow specifying a transaction execution mode.
* Recommended for one-off queries that are expected to produce small result sets.
* ``tx.execute``:
* Returns an iterator over the query results, allowing processing of results that may not fit into client memory.
* Retries must be handled manually via `pool.retry_operation_sync`.
* Allows specifying a transaction execution mode.
* Recommended for scenarios where `pool.execute_with_retries` is insufficient.
Usage of ``pool.execute_with_retries()``:
.. code-block:: python
pool.execute_with_retries("DROP TABLE IF EXISTS example")
pool.execute_with_retries("CREATE TABLE example(key UInt64, value String, PRIMARY KEY (key))")
pool.execute_with_retries("INSERT INTO example (key, value) VALUES (1, 'luffy')")
res = pool.execute_with_retries("SELECT COUNT(*) AS rows_count FROM example")
>>> res[0].rows_count
1
Example of ``tx.execute()``:
.. code-block:: python
def callee(session: ydb.QuerySessionSync):
with session.transaction() as tx:
with tx.execute(
"INSERT INTO example (key, value) VALUES (2, 'zoro')"
):
pass
with tx.execute(
"INSERT INTO example (key, value) VALUES (3, 'sanji')",
commit_tx=True,
):
pass
pool.retry_operation_sync(callee)
Driver
======
The driver is the entry point for all YDB operations. It manages endpoint discovery,
connection pooling, and load balancing. All service clients (query, topic, table) are
obtained through the driver.
DriverConfig
------------
``DriverConfig`` holds all settings needed to initialize a driver. Pass it to the
``Driver`` constructor, or use shorthand keyword arguments directly on the ``Driver``.
**Constructor parameters:**
.. list-table::
:header-rows: 1
:widths: 25 15 60
* - Parameter
- Default
- Description
* - ``endpoint``
- *required*
- gRPC endpoint: ``grpc://host:port`` (plain) or ``grpcs://host:port`` (TLS).
* - ``database``
- ``None``
- Full database path, e.g. ``/local`` or ``/ru-central1/b1g.../etn...``.
* - ``credentials``
- ``None``
- Credentials instance. Falls back to :class:`~ydb.AnonymousCredentials` if ``None``.
* - ``root_certificates``
- ``None``
- PEM-encoded CA certificate(s) as ``bytes`` for TLS verification.
* - ``certificate_chain``
- ``None``
- PEM-encoded client certificate chain as ``bytes`` (mutual TLS).
* - ``private_key``
- ``None``
- PEM-encoded client private key as ``bytes`` (mutual TLS).
* - ``grpc_keep_alive_timeout``
- ``None``
- gRPC KeepAlive timeout in milliseconds.
* - ``disable_discovery``
- ``False``
- If ``True``, skip endpoint discovery and use only the initial endpoint.
* - ``discovery_request_timeout``
- ``10``
- Timeout in seconds for each discovery request.
* - ``table_client_settings``
- ``None``
- :class:`~ydb.TableClientSettings` instance to configure the Table service client.
* - ``topic_client_settings``
- ``None``
- :class:`~ydb.TopicClientSettings` instance to configure the Topic service client.
* - ``query_client_settings``
- ``None``
- :class:`~ydb.QueryClientSettings` instance to configure the Query service client.
* - ``compression``
- ``None``
- gRPC-level compression (e.g. ``grpc.Compression.Gzip``).
Creating a Driver
-----------------
There are three equivalent ways to specify the connection target:
**Using a DriverConfig object:**
.. code-block:: python
import ydb
config = ydb.DriverConfig(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.credentials_from_env_variables(),
)
driver = ydb.Driver(config)
**Using keyword arguments directly on Driver:**
.. code-block:: python
driver = ydb.Driver(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.credentials_from_env_variables(),
)
**Using a connection string:**
.. code-block:: python
driver = ydb.Driver(
connection_string="grpc://localhost:2136/?database=/local",
)
The connection string format is ``://:/?database=``.
Waiting for the Driver to Connect
----------------------------------
After construction the driver starts endpoint discovery in the background.
Call ``wait()`` before issuing any requests:
.. code-block:: python
try:
driver.wait(timeout=5, fail_fast=True)
except TimeoutError:
raise RuntimeError("Could not connect to YDB")
* ``timeout`` — how long to wait in seconds.
* ``fail_fast=True`` — raise immediately if discovery fails instead of retrying until timeout.
Closing the Driver
------------------
Always close the driver when done to release gRPC connections:
.. code-block:: python
driver.stop()
Using the driver as a context manager is the recommended pattern — ``stop()`` is called
automatically on exit:
.. code-block:: python
with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
driver.wait(timeout=5, fail_fast=True)
# ... use driver
Async Driver
------------
``ydb.aio.Driver`` mirrors the synchronous ``Driver`` with async methods:
.. code-block:: python
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.credentials_from_env_variables(),
) as driver:
await driver.wait(timeout=5, fail_fast=True)
# ... use driver
asyncio.run(main())
Credentials
-----------
AnonymousCredentials
^^^^^^^^^^^^^^^^^^^^
No authentication. Use for local or unauthenticated deployments
(`full example `_):
.. code-block:: python
credentials = ydb.AnonymousCredentials()
AccessTokenCredentials
^^^^^^^^^^^^^^^^^^^^^^
Pass a static IAM token or API key directly
(`full example `_):
.. code-block:: python
credentials = ydb.AccessTokenCredentials("your-token")
StaticCredentials (username/password)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(`full example `_)
.. code-block:: python
credentials = ydb.StaticCredentials.from_user_password("user", "password")
Service Account Credentials
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Authenticate as a Yandex Cloud service account using a key file. Requires the ``ydb[yc]``
extra (``pip install ydb[yc]``)
(`full example `_):
.. code-block:: python
import os
import ydb.iam
credentials = ydb.iam.ServiceAccountCredentials.from_file(
os.getenv("SA_KEY_FILE"),
)
Metadata Credentials
^^^^^^^^^^^^^^^^^^^^^
Picks up credentials from the instance metadata service when running inside Yandex Cloud
(Compute VM, Cloud Functions, etc.). Requires ``ydb[yc]``
(`full example `_):
.. code-block:: python
import ydb.iam
credentials = ydb.iam.MetadataUrlCredentials()
OAuth 2.0 Token Exchange
^^^^^^^^^^^^^^^^^^^^^^^^^
For federated identity scenarios. Exchanges a subject token (e.g. a signed JWT) for a
YDB access token via an OAuth 2.0 token exchange endpoint
(`full example `_):
.. code-block:: python
import ydb.oauth2_token_exchange
credentials = ydb.oauth2_token_exchange.Oauth2TokenExchangeCredentials(
token_endpoint="https://auth.example.com/oauth/token",
audience="ydb",
subject_token_source=ydb.oauth2_token_exchange.JwtTokenSource(
signing_method="RS256",
private_key_file="/path/to/private.pem",
key_id="my-key-id",
issuer="my-issuer",
subject="my-subject",
audience="ydb",
),
)
Credentials from environment variables
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The recommended option for cloud deployments — the SDK auto-selects the appropriate
provider based on which environment variable is set:
.. list-table::
:header-rows: 1
:widths: 40 60
* - Environment variable
- Credentials type selected
* - ``YDB_ACCESS_TOKEN_CREDENTIALS``
- ``AccessTokenCredentials``
* - ``YDB_ANONYMOUS_CREDENTIALS``
- ``AnonymousCredentials``
* - ``YDB_METADATA_CREDENTIALS``
- ``MetadataUrlCredentials``
* - ``YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS``
- ``ServiceAccountCredentials``
.. code-block:: python
credentials = ydb.credentials_from_env_variables()
.. note::
``ServiceAccountCredentials`` and ``MetadataUrlCredentials`` require the ``ydb[yc]``
extra:
.. code-block:: sh
pip install ydb[yc]
TLS / Secure Connections
-------------------------
For TLS endpoints (``grpcs://``), the SDK uses the system CA bundle by default.
To supply a custom CA certificate:
.. code-block:: python
with open("ca.pem", "rb") as f:
ca_cert = f.read()
driver = ydb.Driver(
endpoint="grpcs://ydb.example.com:2135",
database="/production/mydb",
credentials=ydb.credentials_from_env_variables(),
root_certificates=ca_cert,
)
For mutual TLS (client certificate authentication):
.. code-block:: python
driver = ydb.Driver(
endpoint="grpcs://ydb.example.com:2135",
database="/production/mydb",
root_certificates=ca_cert,
certificate_chain=client_cert_pem,
private_key=client_key_pem,
)
Service Clients
---------------
Once the driver is ready, access service clients via its properties:
.. list-table::
:header-rows: 1
:widths: 30 30 40
* - Attribute
- Type
- Purpose
* - ``driver.table_client``
- :class:`~ydb.TableClient`
- Schema management, bulk upsert, streaming reads. See :doc:`table`.
* - ``driver.topic_client``
- :class:`~ydb.TopicClient`
- Topic writers, readers, topic management. See :doc:`topic`.
* - ``driver.scheme_client``
- :class:`~ydb.SchemeClient`
- Directory and path operations. See :doc:`scheme`.
:class:`~ydb.QuerySessionPool` is constructed explicitly from the driver, not
accessed as a property:
.. code-block:: python
with ydb.Driver(endpoint=..., database=...) as driver:
driver.wait(timeout=5, fail_fast=True)
with ydb.QuerySessionPool(driver) as pool: # Query service
pass
topics = driver.topic_client # Topic service
scheme = driver.scheme_client # Schema operations
table = driver.table_client # Table service
Query Service
=============
The Query service is the primary API for executing YQL queries in YDB.
It supports DDL (``CREATE TABLE``, ``DROP TABLE``) and DML (``SELECT``, ``INSERT``,
``UPDATE``, ``DELETE``) in a single unified interface with full transaction support.
QuerySessionPool
----------------
:class:`~ydb.QuerySessionPool` manages a pool of server-side sessions and provides safe,
retriable execution of queries. This is the main entry point for the Query service.
**Create a pool:**
.. code-block:: python
import ydb
with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
driver.wait(timeout=5, fail_fast=True)
with ydb.QuerySessionPool(driver) as pool:
# ... use pool
The pool is closed automatically when used as a context manager. Call ``pool.stop()``
explicitly if not using ``with``.
**Async pool:**
.. code-block:: python
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
await driver.wait(timeout=5, fail_fast=True)
async with ydb.aio.QuerySessionPool(driver) as pool:
# ... use pool
Session Acquire Timeout
^^^^^^^^^^^^^^^^^^^^^^^
When the pool is exhausted (all sessions are in use), ``acquire()`` and ``checkout()``
block until a session becomes free. The default behavior is to wait indefinitely.
**Wait indefinitely (default):**
.. code-block:: python
session = pool.acquire() # blocks until a session is available
session = pool.acquire(timeout=None) # explicit, same behavior
**Fail immediately if no session is available:**
.. code-block:: python
from ydb import SessionPoolEmpty
try:
session = pool.acquire(timeout=0)
except SessionPoolEmpty:
# no session was available right now
...
**Fail after a deadline:**
.. code-block:: python
try:
session = pool.acquire(timeout=5.0)
except SessionPoolEmpty:
# no session became available within 5 seconds
...
The same ``timeout`` parameter is available on ``checkout()``:
.. code-block:: python
with pool.checkout(timeout=5.0) as session:
...
When using the retry helpers, set ``max_session_acquire_timeout`` in
:class:`~ydb.RetrySettings` to apply a per-call timeout across all retry attempts:
.. code-block:: python
import ydb
settings = ydb.RetrySettings(max_session_acquire_timeout=5.0)
pool.retry_tx_sync(callee, retry_settings=settings)
.. warning::
**Never call a retry helper from inside another retry helper on the same pool.**
Each retry helper holds a session for the duration of the call. If the inner call
also needs a session and the pool is full, both sides wait for each other and the
program deadlocks silently.
.. code-block:: python
# WRONG — if pool size == 1 (or all sessions are taken), this deadlocks:
def outer(tx):
pool.retry_tx_sync(inner) # tries to acquire a second session while one is held
pool.retry_tx_sync(outer)
If you need auxiliary data, load it before entering the outer retry call, or use a
separate pool.
execute_with_retries
--------------------
The simplest way to run a query. Buffers all results into memory, retries on transient
errors, and runs outside any transaction:
.. code-block:: python
# DDL
pool.execute_with_retries("CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))")
# DML
pool.execute_with_retries(
"INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')"
)
# Query with results
result_sets = pool.execute_with_retries("SELECT id, name FROM users")
for row in result_sets[0].rows:
print(row["id"], row["name"])
.. note::
``execute_with_retries`` loads the entire result set into memory before returning.
For large result sets use ``retry_operation_sync`` with streaming iteration instead.
**Async:**
.. code-block:: python
result_sets = await pool.execute_with_retries("SELECT id, name FROM users")
Parameterized Queries
---------------------
Always use parameters instead of string interpolation to avoid SQL injection and
to allow the server to cache query plans.
**Implicit types** — Python values are mapped to YDB types automatically:
.. code-block:: python
pool.execute_with_retries(
"DECLARE $id AS Uint64; SELECT * FROM users WHERE id = $id",
parameters={"$id": 42},
)
**Explicit types via tuple** — pass ``(value, ydb_type)`` when the automatic mapping
is ambiguous or you need a specific type:
.. code-block:: python
pool.execute_with_retries(
"DECLARE $ids AS List; SELECT * FROM users WHERE id IN $ids",
parameters={"$ids": ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64))},
)
**Explicit types via TypedValue:**
.. code-block:: python
pool.execute_with_retries(
"DECLARE $id AS Int64; SELECT * FROM users WHERE id = $id",
parameters={"$id": ydb.TypedValue(42, ydb.PrimitiveType.Int64)},
)
Common :class:`~ydb.PrimitiveType` values: ``Bool``, ``Int32``, ``Int64``, ``Uint32``,
``Uint64``, ``Float``, ``Double``, ``String``, ``Utf8``, ``Json``, ``Timestamp``,
``Date``, ``Datetime``.
retry_operation_sync
--------------------
Use this when you need manual control over a session — for example, to stream large
result sets or to execute multiple queries in sequence on the same session:
.. code-block:: python
def callee(session: ydb.QuerySession):
with session.execute("SELECT COUNT(*) AS cnt FROM users") as results:
for result_set in results:
print(result_set.rows[0]["cnt"])
pool.retry_operation_sync(callee)
The pool acquires a session, calls ``callee(session)``, and retries the whole call on
retriable errors (e.g. ``UNAVAILABLE``, ``ABORTED``).
**Async:**
.. code-block:: python
async def callee(session: ydb.aio.QuerySession):
async with await session.execute("SELECT COUNT(*) AS cnt FROM users") as results:
async for result_set in results:
print(result_set.rows[0]["cnt"])
await pool.retry_operation_async(callee)
retry_tx_sync
-------------
Use this to execute a group of queries atomically within a transaction.
The pool handles session acquisition, transaction begin, commit, and retries:
.. code-block:: python
def callee(tx: ydb.QueryTxContext):
with tx.execute("INSERT INTO users (id, name) VALUES (3, 'Carol')"):
pass
pool.retry_tx_sync(callee)
If ``callee`` raises a retriable error the whole function (including the transaction)
is re-executed from scratch — make sure ``callee`` is idempotent or relies on the
transaction's atomicity for correctness.
**Async:**
.. code-block:: python
async def callee(tx: ydb.aio.QueryTxContext):
async with await tx.execute("INSERT INTO users (id, name) VALUES (3, 'Carol')"):
pass
await pool.retry_tx_async(callee)
**With an explicit transaction mode:**
.. code-block:: python
pool.retry_tx_sync(callee, tx_mode=ydb.QuerySnapshotReadOnly())
Transactions
------------
Transaction Modes
^^^^^^^^^^^^^^^^^
.. list-table::
:header-rows: 1
:widths: 35 65
* - Mode
- Description
* - :class:`~ydb.QuerySerializableReadWrite`
- Full ACID serializable isolation. Default. Supports reads and writes.
* - :class:`~ydb.QuerySnapshotReadOnly`
- Consistent read-only snapshot taken at transaction start.
* - :class:`~ydb.QuerySnapshotReadWrite`
- Snapshot reads with write support; write conflicts are possible.
* - :class:`~ydb.QueryOnlineReadOnly`
- Each read returns the most recent data at execution time. No cross-read consistency.
* - :class:`~ydb.QueryOnlineReadOnly` ``(allow_inconsistent_reads=True)``
- Fastest reads; even individual reads may be slightly inconsistent.
* - :class:`~ydb.QueryStaleReadOnly`
- Reads may lag by fractions of a second. Each individual read is consistent.
Manual Transaction Control
^^^^^^^^^^^^^^^^^^^^^^^^^^
Use ``session.transaction()`` when you need fine-grained control:
**Synchronous:**
.. code-block:: python
def callee(session: ydb.QuerySession):
with session.transaction() as tx:
tx.begin()
with tx.execute("INSERT INTO users (id, name) VALUES (4, 'Dave')"):
pass
with tx.execute("SELECT COUNT(*) AS cnt FROM users") as results:
for result_set in results:
print("count:", result_set.rows[0]["cnt"])
tx.commit()
# tx.rollback() to discard changes instead
pool.retry_operation_sync(callee)
**Commit on last query** — avoids an extra round-trip:
.. code-block:: python
def callee(session: ydb.QuerySession):
with session.transaction() as tx:
tx.begin()
with tx.execute(
"INSERT INTO users (id, name) VALUES (5, 'Eve')",
commit_tx=True,
):
pass
pool.retry_operation_sync(callee)
**Async:**
.. code-block:: python
async def callee(session: ydb.aio.QuerySession):
async with session.transaction() as tx:
await tx.begin()
async with await tx.execute(
"INSERT INTO users (id, name) VALUES (4, 'Dave')"
):
pass
async with await tx.execute(
"SELECT COUNT(*) AS cnt FROM users"
) as results:
async for result_set in results:
print("count:", result_set.rows[0]["cnt"])
await tx.commit()
await pool.retry_operation_async(callee)
.. note::
The transaction context manager calls ``rollback()`` automatically on exit if you
did not commit explicitly.
Working with Result Sets
------------------------
``session.execute()`` and ``tx.execute()`` return an iterator of ``ResultSet`` objects.
A single query may return multiple result sets (e.g. when it contains multiple ``SELECT``
statements).
**Synchronous iteration:**
.. code-block:: python
with session.execute("SELECT id, name FROM users ORDER BY id") as results:
for result_set in results:
for row in result_set.rows:
print(row["id"], row["name"].decode())
**Async iteration:**
.. code-block:: python
async with await session.execute("SELECT id, name FROM users") as results:
async for result_set in results:
for row in result_set.rows:
print(row["id"], row["name"].decode())
**ResultSet fields:**
.. code-block:: python
result_set.rows # list of Row objects — access columns by name: row["col"]
result_set.columns # list of column descriptors with .name and .type
result_set.truncated # True if the server truncated results (use pagination)
.. note::
``String`` columns return ``bytes``; ``Utf8`` columns return ``str``.
Decode as needed: ``row["name"].decode("utf-8")``.
DDL Queries
-----------
DDL is executed the same way as DML — through the pool or a session:
.. code-block:: python
pool.execute_with_retries(
"""
CREATE TABLE orders (
order_id Uint64,
user_id Uint64,
amount Double,
created Timestamp,
PRIMARY KEY (order_id)
)
"""
)
pool.execute_with_retries("DROP TABLE IF EXISTS orders")
Complete Examples
-----------------
Synchronous
^^^^^^^^^^^
.. code-block:: python
import ydb
with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
driver.wait(timeout=5, fail_fast=True)
with ydb.QuerySessionPool(driver) as pool:
# Schema
pool.execute_with_retries("DROP TABLE IF EXISTS users")
pool.execute_with_retries(
"CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))"
)
# Insert in a transaction
def insert(tx: ydb.QueryTxContext):
with tx.execute(
"INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')",
commit_tx=True,
):
pass
pool.retry_tx_sync(insert)
# Read
result_sets = pool.execute_with_retries("SELECT id, name FROM users")
for row in result_sets[0].rows:
print(row["id"], row["name"])
Asynchronous
^^^^^^^^^^^^
.. code-block:: python
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(
endpoint="grpc://localhost:2136", database="/local"
) as driver:
await driver.wait(timeout=5, fail_fast=True)
async with ydb.aio.QuerySessionPool(driver) as pool:
# Schema
await pool.execute_with_retries("DROP TABLE IF EXISTS users")
await pool.execute_with_retries(
"CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))"
)
# Insert in a transaction
async def insert(tx: ydb.aio.QueryTxContext):
async with await tx.execute(
"INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')",
commit_tx=True,
):
pass
await pool.retry_tx_async(insert)
# Read
result_sets = await pool.execute_with_retries(
"SELECT id, name FROM users"
)
for row in result_sets[0].rows:
print(row["id"], row["name"])
asyncio.run(main())
Apache Arrow Format
-------------------
By default query results are returned as YDB value objects. For analytics workloads
you can request results in `Apache Arrow `_ IPC format,
which integrates directly with pandas, polars, and other columnar tools. Requires
``pyarrow`` to be installed (``pip install pyarrow``).
.. code-block:: python
import pyarrow as pa
import ydb
result_sets = pool.execute_with_retries(
"SELECT id, name, score FROM users ORDER BY id LIMIT 1000",
result_set_format=ydb.QueryResultSetFormat.ARROW,
)
for result_set in result_sets:
schema = pa.ipc.read_schema(pa.py_buffer(result_set.arrow_format_meta.schema))
batch = pa.ipc.read_record_batch(pa.py_buffer(result_set.data), schema)
df = batch.to_pandas()
print(df.head())
Arrow results can also be compressed on the wire. Pass ``arrow_format_settings`` to
choose a codec:
.. code-block:: python
settings = ydb.ArrowFormatSettings(
compression_codec=ydb.ArrowCompressionCodec(
ydb.ArrowCompressionCodecType.ZSTD,
level=10,
)
)
result_sets = pool.execute_with_retries(
"SELECT * FROM events LIMIT 100000",
result_set_format=ydb.QueryResultSetFormat.ARROW,
arrow_format_settings=settings,
)
.. note::
Arrow format is only useful when you process results with a columnar library.
For regular row-by-row processing use the default ``VALUE`` format — it has
no extra dependencies and lower overhead for small result sets.
Query Stats and Explain
-----------------------
Query Stats
^^^^^^^^^^^
YDB can return execution statistics alongside query results. Pass a :class:`~ydb.QueryStatsMode`
value as ``stats_mode`` to ``session.execute()`` or ``tx.execute()``, then read
``last_query_stats`` after the iterator is consumed:
.. code-block:: python
def callee(session: ydb.QuerySession):
with session.execute(
"SELECT * FROM users",
stats_mode=ydb.QueryStatsMode.BASIC,
):
pass # must iterate to completion for stats to be populated
print(session.last_query_stats)
pool.retry_operation_sync(callee)
The same works inside a transaction — stats reflect the last executed statement:
.. code-block:: python
def callee(tx: ydb.QueryTxContext):
with tx.execute(
"SELECT COUNT(*) AS cnt FROM users",
stats_mode=ydb.QueryStatsMode.FULL,
) as results:
for result_set in results:
print(result_set.rows[0]["cnt"])
print(tx.last_query_stats)
pool.retry_tx_sync(callee)
**Stats modes:**
.. list-table::
:header-rows: 1
:widths: 30 70
* - :class:`~ydb.QueryStatsMode` value
- What is returned
* - ``QueryStatsMode.NONE``
- No statistics (default).
* - ``QueryStatsMode.BASIC``
- Row counts and execution time per stage.
* - ``QueryStatsMode.FULL``
- Full per-operator statistics.
* - ``QueryStatsMode.PROFILE``
- Full statistics plus the query execution plan. Use this for performance
investigations.
Explain
^^^^^^^
``explain`` returns the query execution plan without actually running the query.
Use it to understand how YDB will execute a statement before sending it to
production:
.. code-block:: python
# Returns a JSON string
plan = pool.explain_with_retries("SELECT * FROM users WHERE id = 1")
print(plan)
# Returns a parsed dict
plan_dict = pool.explain_with_retries(
"SELECT * FROM users WHERE id = 1",
result_format=ydb.QueryExplainResultFormat.DICT,
)
print(plan_dict["Plan"])
You can also call ``explain`` directly on a session:
.. code-block:: python
def callee(session: ydb.QuerySession):
plan = session.explain("SELECT * FROM users WHERE id = 1")
print(plan)
pool.retry_operation_sync(callee)
Topic Service
=============
YDB Topic Service is a persistent message queue that supports multiple producers and consumers,
horizontal scaling via partitioning, and at-least-once delivery semantics.
The Python SDK provides both synchronous and asynchronous clients with identical APIs.
Concepts
--------
**Topic** — a named stream of messages divided into one or more partitions.
**Partition** — an ordered, append-only log. Messages within a partition are delivered in order.
**Producer** — a client that writes messages to a topic. Each producer has a ``producer_id``
that is used to deduplicate messages via sequence numbers (``seqno``).
**Consumer** — a named group of readers that share offset state. Each consumer independently
tracks its position in every partition, so multiple consumer groups can read the same topic
without interfering with each other.
Getting a Topic Client
----------------------
The topic client is available on every driver instance via the ``topic_client`` property.
**Synchronous:**
.. code-block:: python
import ydb
with ydb.Driver(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.credentials_from_env_variables(),
) as driver:
driver.wait(timeout=5, fail_fast=True)
topic_client = driver.topic_client
**Asynchronous:**
.. code-block:: python
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.credentials_from_env_variables(),
) as driver:
await driver.wait(timeout=5, fail_fast=True)
topic_client = driver.topic_client
Topic Management
----------------
Create a Topic
^^^^^^^^^^^^^^
.. code-block:: python
# Synchronous
driver.topic_client.create_topic(
"/local/my-topic",
min_active_partitions=1,
max_active_partitions=10,
retention_period=datetime.timedelta(hours=24),
consumers=["my-consumer"],
)
# Asynchronous
await driver.topic_client.create_topic(
"/local/my-topic",
min_active_partitions=1,
max_active_partitions=10,
retention_period=datetime.timedelta(hours=24),
consumers=["my-consumer"],
)
Key parameters for ``create_topic``:
* ``path`` — full topic path including database prefix.
* ``min_active_partitions`` — minimum number of active partitions (default: 1).
* ``max_active_partitions`` — maximum number of partitions when auto-scaling is enabled.
* ``retention_period`` — how long messages are kept (``datetime.timedelta``).
* ``retention_storage_mb`` — maximum storage size per partition in megabytes.
* ``supported_codecs`` — list of :class:`~ydb.TopicCodec` values the topic accepts (default: RAW and GZIP).
* ``consumers`` — list of consumer names (strings) or :class:`~ydb.TopicConsumer` objects to create upfront.
* ``partition_write_speed_bytes_per_second`` — per-partition write throughput limit.
Alter a Topic
^^^^^^^^^^^^^
.. code-block:: python
# Add a consumer
driver.topic_client.alter_topic(
"/local/my-topic",
add_consumers=["new-consumer"],
)
# Change retention
driver.topic_client.alter_topic(
"/local/my-topic",
set_retention_period=datetime.timedelta(hours=48),
)
Drop a Topic
^^^^^^^^^^^^
.. code-block:: python
driver.topic_client.drop_topic("/local/my-topic")
Describe a Topic
^^^^^^^^^^^^^^^^
.. code-block:: python
description = driver.topic_client.describe_topic("/local/my-topic")
print(description.partitions)
print(description.consumers)
# Include partition-level statistics
description = driver.topic_client.describe_topic(
"/local/my-topic",
include_stats=True,
)
Writing Messages
----------------
Create a Writer
^^^^^^^^^^^^^^^
Use ``topic_client.writer()`` as a context manager (recommended) or create it manually.
**Synchronous:**
.. code-block:: python
with driver.topic_client.writer("/local/my-topic") as writer:
writer.write("hello")
**Asynchronous:**
.. code-block:: python
async with driver.topic_client.writer("/local/my-topic") as writer:
await writer.write("hello")
**Without context manager** (you must call ``close()`` yourself):
.. code-block:: python
writer = driver.topic_client.writer("/local/my-topic")
try:
writer.write("hello")
finally:
writer.close()
Writer Parameters
^^^^^^^^^^^^^^^^^
.. code-block:: python
writer = driver.topic_client.writer(
"/local/my-topic",
producer_id="my-producer", # Unique producer ID; auto-generated UUID if omitted.
partition_id=0, # Pin to a specific partition; None = auto-select.
codec=ydb.TopicCodec.GZIP, # Compress messages. Default: RAW (no compression).
auto_seqno=True, # Auto-increment sequence numbers (default: True).
auto_created_at=True, # Auto-set message timestamps (default: True).
)
* ``producer_id`` is used for deduplication: if the same ``(producer_id, seqno)`` pair is
received again, YDB silently skips it.
* When ``auto_seqno=True`` the SDK assigns monotonically increasing sequence numbers.
Set ``auto_seqno=False`` and provide ``seqno`` manually when you need deterministic IDs.
Sending Messages
^^^^^^^^^^^^^^^^
**Simple write** — non-blocking; buffers the message internally and returns immediately:
.. code-block:: python
writer.write("text message")
writer.write(b"\x01\x02\x03") # bytes
writer.write(["msg-1", "msg-2"]) # send multiple messages in one call
**Full message form** with optional fields:
.. code-block:: python
writer.write(ydb.TopicWriterMessage(
data="hello",
seqno=42, # omit when auto_seqno=True
created_at=datetime.datetime.now(), # omit when auto_created_at=True
metadata_items={"key": "value"},
))
**Write and wait for server acknowledgment:**
.. code-block:: python
# Blocks until the server confirms the write.
result = writer.write_with_ack("important message")
# Get a Future and wait for it later (synchronous client).
future = writer.async_write_with_ack("important message")
future.result() # blocks here
# Async client — await directly.
result = await writer.write_with_ack("important message")
**Flush** — wait for all previously buffered messages to be acknowledged:
.. code-block:: python
for i in range(100):
writer.write(f"message-{i}")
writer.flush() # blocks until all 100 messages are acked
# Async version:
await writer.flush()
Async Write Pattern
^^^^^^^^^^^^^^^^^^^
For high-throughput pipelines, buffer writes and gather futures:
.. code-block:: python
import concurrent.futures
futures = []
for i in range(100):
future = writer.async_write_with_ack(f"message-{i}")
futures.append(future)
concurrent.futures.wait(futures)
for f in futures:
if f.exception():
raise f.exception()
Writer Backpressure
^^^^^^^^^^^^^^^^^^^
By default the writer's internal buffer is unbounded — ``write()`` always returns immediately
regardless of how many unacknowledged messages are in flight. Enable backpressure by setting
one or both limits:
.. code-block:: python
writer = driver.topic_client.writer(
"/local/my-topic",
max_buffer_size_bytes=50 * 1024 * 1024, # pause when 50 MB in flight
max_buffer_messages=1000, # pause when 1000 messages in flight
)
A message is counted as occupying the buffer from the moment it is passed to ``write()``
until the server acknowledges it. Backpressure is active when **at least one** limit is set;
setting both means either limit can trigger a wait (OR semantics).
The limits are **soft**: ``write()`` blocks only if the buffer is *already* at or above the
limit when the call starts. Once unblocked, the entire batch is admitted regardless of its
size. This means callers that batch multiple messages in a single ``write()`` call will never
deadlock even when the batch is larger than the limit.
**Blocking behavior (default)**
When the buffer is at or above the limit, ``write()`` blocks until enough messages are
acknowledged by the server. There is no timeout by default — the call waits indefinitely:
.. code-block:: python
# Producer pauses here if the buffer is full, then proceeds once space is freed.
writer.write("message")
**Timeout**
Set ``buffer_wait_timeout_sec`` to raise :class:`~ydb.TopicWriterBufferFullError` if space
does not free up in time. Use a positive value to wait up to that many seconds, or ``0`` to
fail immediately without waiting (non-blocking):
.. code-block:: python
writer = driver.topic_client.writer(
"/local/my-topic",
max_buffer_messages=500,
buffer_wait_timeout_sec=5.0, # raise after 5 seconds; use 0 to fail immediately
)
try:
writer.write("message")
except ydb.TopicWriterBufferFullError:
# handle overload — log, drop, or apply back-off
...
**Async client**
The async writer behaves identically — ``await writer.write()`` suspends the coroutine
instead of blocking the thread:
.. code-block:: python
writer = driver.topic_client.writer(
"/local/my-topic",
max_buffer_size_bytes=4 * 1024 * 1024,
buffer_wait_timeout_sec=10.0,
)
try:
await writer.write("message")
except ydb.TopicWriterBufferFullError:
...
To apply your own timeout without raising an error, wrap the call with
``asyncio.wait_for``:
.. code-block:: python
try:
await asyncio.wait_for(writer.write("message"), timeout=2.0)
except asyncio.TimeoutError:
... # timed out waiting for buffer space
Reading Messages
----------------
Create a Reader
^^^^^^^^^^^^^^^
A reader requires a ``consumer`` name that must already exist on the topic.
**Synchronous:**
.. code-block:: python
with driver.topic_client.reader("/local/my-topic", consumer="my-consumer") as reader:
message = reader.receive_message()
print(message.data.decode())
reader.commit(message)
**Asynchronous:**
.. code-block:: python
async with driver.topic_client.reader("/local/my-topic", consumer="my-consumer") as reader:
message = await reader.receive_message()
print(message.data.decode())
reader.commit(message)
Reader Parameters
^^^^^^^^^^^^^^^^^
.. code-block:: python
reader = driver.topic_client.reader(
topic="/local/my-topic", # str, TopicReaderSelector, or a list of these
consumer="my-consumer",
buffer_size_bytes=50 * 1024 * 1024, # client-side buffer (default: 50 MB)
buffer_release_threshold=0.5, # see below (default: 0.5)
)
``buffer_size_bytes`` controls how many bytes the server is allowed to send before the client
signals that it is ready for more. The server will not exceed this limit.
``buffer_release_threshold`` (float in ``[0.0, 1.0]``) controls when the client sends a new
``ReadRequest`` to the server after consuming messages from the local buffer:
* ``0.0`` — send a ``ReadRequest`` immediately after every batch is consumed.
Produces more round-trips when many small batches arrive.
* ``> 0.0`` — accumulate freed bytes until they reach
``threshold × buffer_size_bytes``, then send a single ``ReadRequest`` covering the
accumulated amount. This reduces network round-trips. The default is ``0.5``.
Example — reduce round-trips for a high-throughput reader with many small messages:
.. code-block:: python
reader = driver.topic_client.reader(
"/local/my-topic",
consumer="my-consumer",
buffer_size_bytes=50 * 1024 * 1024,
buffer_release_threshold=0.2, # send ReadRequest after freeing 10 MiB
)
To read from multiple topics at once, pass a list:
.. code-block:: python
reader = driver.topic_client.reader(
topic=["/local/topic-a", "/local/topic-b"],
consumer="my-consumer",
)
To fine-tune per-topic settings (e.g. start offset or timestamp), use :class:`~ydb.TopicReaderSelector`:
.. code-block:: python
reader = driver.topic_client.reader(
topic=ydb.TopicReaderSelector(
path="/local/my-topic",
read_from=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc),
),
consumer="my-consumer",
)
Receiving Messages
^^^^^^^^^^^^^^^^^^
**One message at a time:**
.. code-block:: python
# Synchronous — blocks until a message arrives.
message = reader.receive_message()
print(message.data.decode())
reader.commit(message)
# With timeout — raises TimeoutError if no message arrives within 1 second.
try:
message = reader.receive_message(timeout=1)
except TimeoutError:
print("no new messages")
# Asynchronous:
message = await reader.receive_message()
**Batch processing:**
.. code-block:: python
# Synchronous
batch = reader.receive_batch()
for message in batch.messages:
process(message)
reader.commit(batch) # commit the whole batch at once
# With size limits
batch = reader.receive_batch(max_messages=100)
# Asynchronous
batch = await reader.receive_batch()
for message in batch.messages:
process(message)
reader.commit(batch)
Message Fields
^^^^^^^^^^^^^^
Each received message exposes:
.. code-block:: python
message.data # bytes — the message payload
message.seqno # int — producer sequence number
message.offset # int — partition offset
message.partition_id # int — partition this message came from
message.producer_id # str — producer_id set by the writer
message.created_at # datetime — timestamp set by the writer
message.written_at # datetime — timestamp when YDB persisted the message
message.metadata_items # Dict[str, bytes] — arbitrary key-value metadata
Committing Offsets
^^^^^^^^^^^^^^^^^^
Committing tells YDB that the consumer has successfully processed a message. YDB resumes
from the last committed offset when the reader reconnects.
.. code-block:: python
# Non-blocking commit — buffered and sent in the background.
reader.commit(message)
reader.commit(batch)
# Blocking commit — waits for the server to acknowledge the commit.
reader.commit_with_ack(message)
# Async version:
await reader.commit_with_ack(message)
.. note::
``commit()`` is non-blocking and sufficient for most use cases. The offset is flushed
automatically when the reader is closed (``flush=True`` by default).
Handling Partition Rebalancing
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When the topic rebalances (partitions are reassigned between reader instances), unfinished
messages from revoked partitions become stale. The SDK signals this via the ``alive``
property:
.. code-block:: python
# Per message
message = reader.receive_message()
# ... do some work ...
if message.alive:
reader.commit(message)
# Per batch — check before expensive processing
batch = reader.receive_batch()
for message in batch.messages:
if not batch.alive:
break # partition revoked, skip remaining messages
process(message)
if batch.alive:
reader.commit(batch)
Transactions
------------
The SDK integrates with YDB transactions so that topic writes and reads can be made
atomically together with database queries.
Transactional Write
^^^^^^^^^^^^^^^^^^^
Use ``topic_client.tx_writer(tx, topic)`` inside a transaction callback:
**Synchronous:**
.. code-block:: python
def write_with_tx(driver: ydb.Driver, topic: str):
with ydb.QuerySessionPool(driver) as pool:
def callee(tx: ydb.QueryTxContext):
tx_writer = driver.topic_client.tx_writer(tx, topic)
for i in range(10):
tx_writer.write(ydb.TopicWriterMessage(f"message-{i}"))
pool.retry_tx_sync(callee)
**Asynchronous:**
.. code-block:: python
async def write_with_tx(driver: ydb.aio.Driver, topic: str):
async with ydb.aio.QuerySessionPool(driver) as pool:
async def callee(tx: ydb.aio.QueryTxContext):
tx_writer = driver.topic_client.tx_writer(tx, topic)
for i in range(10):
await tx_writer.write(ydb.TopicWriterMessage(f"message-{i}"))
await pool.retry_tx_async(callee)
Messages written via a ``tx_writer`` are visible to readers only after the transaction commits.
If the transaction rolls back, the messages are discarded.
Transactional Read
^^^^^^^^^^^^^^^^^^
Use ``reader.receive_batch_with_tx(tx)`` to read messages inside a transaction. The commit
offset is advanced atomically with the transaction:
**Synchronous:**
.. code-block:: python
def read_with_tx(driver: ydb.Driver, topic: str, consumer: str):
with driver.topic_client.reader(topic, consumer) as reader:
with ydb.QuerySessionPool(driver) as pool:
def callee(tx: ydb.QueryTxContext):
batch = reader.receive_batch_with_tx(tx, max_messages=10)
for message in batch.messages:
process(message)
pool.retry_tx_sync(callee)
**Asynchronous:**
.. code-block:: python
async def read_with_tx(driver: ydb.aio.Driver, topic: str, consumer: str):
async with driver.topic_client.reader(topic, consumer) as reader:
async with ydb.aio.QuerySessionPool(driver) as pool:
async def callee(tx: ydb.aio.QueryTxContext):
batch = await reader.receive_batch_with_tx(tx, max_messages=10)
for message in batch.messages:
process(message)
await pool.retry_tx_async(callee)
.. note::
Do not call ``reader.commit()`` when using ``receive_batch_with_tx`` — the commit is
handled automatically by the transaction.
Auto-Partitioning
-----------------
YDB can automatically scale the number of partitions up (and optionally down) based on write
throughput. Enable it when creating a topic:
.. code-block:: python
import datetime
import ydb
driver.topic_client.create_topic(
"/local/my-topic",
consumers=["my-consumer"],
max_active_partitions=100,
auto_partitioning_settings=ydb.TopicAutoPartitioningSettings(
strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP,
up_utilization_percent=70, # split a partition when it reaches 70% capacity
down_utilization_percent=30, # merge partitions when below 30% (SCALE_UP_AND_DOWN only)
stabilization_window=datetime.timedelta(seconds=60),
),
)
Available :class:`~ydb.TopicAutoPartitioningStrategy` values:
* ``SCALE_UP`` — only split partitions (add capacity).
* ``SCALE_UP_AND_DOWN`` — split and merge partitions automatically.
* ``DISABLED`` — static partition count.
The reader handles partition splits and merges transparently when ``auto_partitioning_support=True``
(which is the default):
.. code-block:: python
reader = driver.topic_client.reader(
"/local/my-topic",
consumer="my-consumer",
auto_partitioning_support=True, # default
)
Custom Codecs
-------------
The SDK supports RAW (no compression) and GZIP out of the box. You can register custom
encode/decode functions for any codec ID:
**Custom encoder (writer side):**
.. code-block:: python
import zlib
def my_compress(data: bytes) -> bytes:
return zlib.compress(data)
CUSTOM_CODEC = 10001 # must be in range 10000–19999
writer = driver.topic_client.writer(
"/local/my-topic",
codec=CUSTOM_CODEC,
encoders={CUSTOM_CODEC: my_compress},
)
**Custom decoder (reader side):**
.. code-block:: python
def my_decompress(data: bytes) -> bytes:
return zlib.decompress(data)
reader = driver.topic_client.reader(
"/local/my-topic",
consumer="my-consumer",
decoders={CUSTOM_CODEC: my_decompress},
)
For CPU-intensive codecs you can offload encoding/decoding to a thread pool:
.. code-block:: python
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
writer = driver.topic_client.writer(
"/local/my-topic",
codec=CUSTOM_CODEC,
encoders={CUSTOM_CODEC: my_compress},
encoder_executor=executor,
)
Topic Client Settings
---------------------
Pass :class:`~ydb.TopicClientSettings` to the driver config to tune SDK-wide defaults:
.. code-block:: python
config = ydb.DriverConfig(
endpoint="grpc://localhost:2136",
database="/local",
)
config.topic_client_settings = ydb.TopicClientSettings(
encode_decode_threads_count=4, # threads used for codec operations (default: 1)
)
driver = ydb.Driver(config)
Complete Examples
-----------------
Synchronous: Write and Read
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
import ydb
endpoint = "grpc://localhost:2136"
database = "/local"
topic_path = "/local/my-topic"
consumer = "my-consumer"
with ydb.Driver(endpoint=endpoint, database=database) as driver:
driver.wait(timeout=5, fail_fast=True)
# Create topic (skip if already exists)
try:
driver.topic_client.drop_topic(topic_path)
except ydb.SchemeError:
pass
driver.topic_client.create_topic(topic_path, consumers=[consumer])
# Write messages
with driver.topic_client.writer(topic_path) as writer:
for i in range(10):
writer.write(f"message-{i}")
# Read messages
with driver.topic_client.reader(topic_path, consumer=consumer) as reader:
for _ in range(10):
try:
message = reader.receive_message(timeout=5)
print(message.data.decode())
reader.commit(message)
except TimeoutError:
break
Asynchronous: Write and Read Concurrently
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
import asyncio
import ydb
endpoint = "grpc://localhost:2136"
database = "/local"
topic_path = "/local/my-topic"
consumer = "my-consumer"
async def write_messages(driver: ydb.aio.Driver):
async with driver.topic_client.writer(topic_path) as writer:
for i in range(10):
await writer.write(ydb.TopicWriterMessage(
data=f"message-{i}",
metadata_items={"index": str(i)},
))
async def read_messages(driver: ydb.aio.Driver):
async with driver.topic_client.reader(topic_path, consumer=consumer) as reader:
while True:
try:
message = await asyncio.wait_for(reader.receive_message(), timeout=5)
print(message.data.decode(), message.metadata_items)
reader.commit(message)
except asyncio.TimeoutError:
return
async def main():
async with ydb.aio.Driver(endpoint=endpoint, database=database) as driver:
await driver.wait(timeout=5, fail_fast=True)
# Create topic
try:
await driver.topic_client.drop_topic(topic_path)
except ydb.SchemeError:
pass
await driver.topic_client.create_topic(topic_path, consumers=[consumer])
await asyncio.gather(write_messages(driver), read_messages(driver))
asyncio.run(main())
Table Service
=============
The Table service is the legacy API for schema management and bulk data operations.
Use it when you need operations that are not available through YQL — creating tables
with fine-grained partitioning, bulk loading data, streaming full table scans, or
managing secondary indexes programmatically.
For running queries use :doc:`query` instead. The Table service does not replace
the Query service; the two are complementary.
The Table service is accessed through ``driver.table_client``:
.. code-block:: python
import ydb
with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
driver.wait(timeout=5, fail_fast=True)
client = driver.table_client
Schema Management
-----------------
Creating a Table
^^^^^^^^^^^^^^^^
Build a :class:`~ydb.TableDescription` and pass it to ``create_table``:
.. code-block:: python
driver.table_client.create_table(
"/local/users",
ydb.TableDescription()
.with_columns(
ydb.Column("id", ydb.PrimitiveType.Uint64),
ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)),
ydb.Column("age", ydb.OptionalType(ydb.PrimitiveType.Uint32)),
)
.with_primary_keys("id"),
)
Primary key columns must not be wrapped in :class:`~ydb.OptionalType` — they cannot be
``NULL``. All other columns should use :class:`~ydb.OptionalType` so rows with missing
values are accepted.
Dropping a Table
^^^^^^^^^^^^^^^^
.. code-block:: python
driver.table_client.drop_table("/local/users")
Describing a Table
^^^^^^^^^^^^^^^^^^
``describe_table`` returns a :class:`~ydb.TableSchemeEntry` with column definitions, indexes,
TTL settings, and partition information:
.. code-block:: python
entry = driver.table_client.describe_table("/local/users")
for column in entry.columns:
print(column.name, column.type)
for index in entry.indexes:
print(index.name, index.index_columns)
Pass :class:`~ydb.DescribeTableSettings` to request additional detail:
.. code-block:: python
settings = ydb.DescribeTableSettings().with_include_shard_key_bounds(True)
entry = driver.table_client.describe_table("/local/users", settings)
Altering a Table
^^^^^^^^^^^^^^^^
``alter_table`` accepts keyword arguments for each type of modification. Pass
only the arguments you need — unspecified arguments are left unchanged:
**Add or remove columns:**
.. code-block:: python
driver.table_client.alter_table(
"/local/users",
add_columns=[ydb.Column("email", ydb.OptionalType(ydb.PrimitiveType.Utf8))],
drop_columns=["age"],
)
**Add or remove secondary indexes:**
.. code-block:: python
driver.table_client.alter_table(
"/local/users",
add_indexes=[
ydb.TableIndex("name_idx").with_index_columns("name"),
],
drop_indexes=["old_idx"],
)
**Modify TTL:**
.. code-block:: python
driver.table_client.alter_table(
"/local/users",
set_ttl_settings=ydb.TtlSettings().with_date_type_column("expires_at"),
)
# Remove TTL
driver.table_client.alter_table(
"/local/users",
drop_ttl_settings=ydb.DropTtl(),
)
**Change partitioning settings:**
.. code-block:: python
driver.table_client.alter_table(
"/local/events",
alter_partitioning_settings=(
ydb.PartitioningSettings()
.with_min_partitions_count(4)
.with_max_partitions_count(256)
),
)
Copying and Renaming Tables
^^^^^^^^^^^^^^^^^^^^^^^^^^^
``copy_table`` copies a single table:
.. code-block:: python
driver.table_client.copy_table(
"/local/users",
"/local/users_backup",
)
``copy_tables`` copies multiple tables atomically:
.. code-block:: python
driver.table_client.copy_tables([
("/local/users", "/local/backup/users"),
("/local/orders", "/local/backup/orders"),
])
``rename_tables`` renames (or moves) tables atomically:
.. code-block:: python
driver.table_client.rename_tables([
("/local/users_new", "/local/users"),
])
If the destination already exists the call fails unless you add
``replace_destination=True`` to a ``RenameIndexItem`` — for table renames,
this replaces the destination atomically.
Secondary Indexes
-----------------
Indexes are defined via :class:`~ydb.TableIndex` when creating or altering a table.
**Global index** — built synchronously; the table is unavailable during index
construction when adding to an existing table:
.. code-block:: python
ydb.TableIndex("name_idx").with_index_columns("name")
**Global async index** — built in the background; the table stays available:
.. code-block:: python
ydb.TableIndex("name_idx").with_global_async_index().with_index_columns("name")
**Covered index** — stores extra columns in the index so queries can be answered
without a primary-key lookup:
.. code-block:: python
ydb.TableIndex("name_age_idx") \
.with_index_columns("name") \
.with_data_columns("age", "email")
Full example — create a table with a covered index from the start:
.. code-block:: python
driver.table_client.create_table(
"/local/users",
ydb.TableDescription()
.with_columns(
ydb.Column("id", ydb.PrimitiveType.Uint64),
ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)),
ydb.Column("email", ydb.OptionalType(ydb.PrimitiveType.Utf8)),
ydb.Column("age", ydb.OptionalType(ydb.PrimitiveType.Uint32)),
)
.with_primary_keys("id")
.with_indexes(
ydb.TableIndex("name_idx")
.with_index_columns("name")
.with_data_columns("email"),
),
)
TTL Settings
------------
YDB can automatically delete rows when a column value indicates the row has
expired. Two modes are available.
**Date-type column mode** — use a ``Date``, ``Datetime``, or ``Timestamp`` column
whose value is the expiry time. Rows with a value in the past are deleted:
.. code-block:: python
ydb.TtlSettings().with_date_type_column("expires_at")
# Optional: delay deletion by N seconds after expiry
ydb.TtlSettings().with_date_type_column("expires_at", expire_after_seconds=3600)
**Value-since-unix-epoch mode** — use an integer column that stores time as a
Unix timestamp in the specified unit:
.. code-block:: python
ydb.TtlSettings().with_value_since_unix_epoch(
"created_ts",
ydb.ColumnUnit.UNIT_SECONDS,
expire_after_seconds=86400, # delete rows 24 h after created_ts
)
:class:`~ydb.ColumnUnit` values: ``UNIT_SECONDS``, ``UNIT_MILLISECONDS``,
``UNIT_MICROSECONDS``, ``UNIT_NANOSECONDS``.
Example — create a table where rows expire 7 days after creation:
.. code-block:: python
driver.table_client.create_table(
"/local/events",
ydb.TableDescription()
.with_columns(
ydb.Column("id", ydb.PrimitiveType.Uint64),
ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)),
ydb.Column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp)),
)
.with_primary_keys("id")
.with_ttl(
ydb.TtlSettings().with_date_type_column(
"created_at",
expire_after_seconds=7 * 24 * 3600,
)
),
)
Partitioning Settings
---------------------
:class:`~ydb.PartitioningSettings` controls how the table is split into shards and how
that split evolves over time:
.. code-block:: python
ydb.PartitioningSettings() \
.with_min_partitions_count(4) \
.with_max_partitions_count(256) \
.with_partition_size_mb(512) \
.with_partitioning_by_load(ydb.FeatureFlag.ENABLED) \
.with_partitioning_by_size(ydb.FeatureFlag.ENABLED)
.. list-table::
:header-rows: 1
:widths: 40 60
* - Method
- Effect
* - ``with_min_partitions_count(n)``
- Table always has at least *n* shards.
* - ``with_max_partitions_count(n)``
- Table never exceeds *n* shards.
* - ``with_partition_size_mb(mb)``
- Target shard size; triggers a split when exceeded (requires by-size enabled).
* - ``with_partitioning_by_size(flag)``
- Auto-split when a shard grows beyond the target size.
* - ``with_partitioning_by_load(flag)``
- Auto-split hot shards and merge idle ones based on load.
:class:`~ydb.FeatureFlag` values: ``ENABLED``, ``DISABLED``, ``UNSPECIFIED``.
Pass ``PartitioningSettings`` to ``TableDescription.with_partitioning_settings``:
.. code-block:: python
driver.table_client.create_table(
"/local/events",
ydb.TableDescription()
.with_columns(
ydb.Column("id", ydb.PrimitiveType.Uint64),
ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)),
)
.with_primary_keys("id")
.with_partitioning_settings(
ydb.PartitioningSettings()
.with_min_partitions_count(4)
.with_max_partitions_count(64)
.with_partitioning_by_load(ydb.FeatureFlag.ENABLED)
),
)
**Pre-split at specific keys** — useful when the key distribution is known upfront:
.. code-block:: python
driver.table_client.create_table(
"/local/events",
ydb.TableDescription()
.with_columns(
ydb.Column("id", ydb.PrimitiveType.Uint64),
ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)),
)
.with_primary_keys("id")
.with_partition_at_keys(
ydb.ExplicitPartitions(
[ydb.SplitPoint(1000), ydb.SplitPoint(2000), ydb.SplitPoint(3000)]
)
),
)
Column Families
---------------
Column families group columns for independent storage and compression settings.
Useful when a table has both hot (frequently read) and cold (rarely read) columns:
.. code-block:: python
driver.table_client.create_table(
"/local/users",
ydb.TableDescription()
.with_columns(
ydb.Column("id", ydb.PrimitiveType.Uint64),
ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)),
ydb.Column("bio", ydb.OptionalType(ydb.PrimitiveType.Utf8), family="blobs"),
ydb.Column("avatar", ydb.OptionalType(ydb.PrimitiveType.String), family="blobs"),
)
.with_primary_keys("id")
.with_column_families(
ydb.ColumnFamily()
.with_name("blobs")
.with_compression(ydb.Compression.LZ4),
),
)
:class:`~ydb.Compression` values: ``NONE``, ``LZ4``, ``UNSPECIFIED``.
Bulk Upsert
-----------
``bulk_upsert`` loads rows without going through a transaction. It is faster than
individual ``INSERT``/``UPSERT`` statements for large batches and is idempotent
(safe to retry on failure).
Build a :class:`~ydb.BulkUpsertColumns` descriptor that lists the columns and their types,
then pass it together with the rows:
.. code-block:: python
from collections import namedtuple
User = namedtuple("User", ["id", "name", "age"])
rows = [
User(id=1, name="Alice", age=30),
User(id=2, name="Bob", age=25),
User(id=3, name="Carol", age=28),
]
column_types = (
ydb.BulkUpsertColumns()
.add_column("id", ydb.PrimitiveType.Uint64)
.add_column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("age", ydb.OptionalType(ydb.PrimitiveType.Uint32))
)
driver.table_client.bulk_upsert("/local/users", rows, column_types)
Each row must be an object (named tuple, dataclass, or any object) whose
attributes match the column names in ``BulkUpsertColumns``.
.. note::
``bulk_upsert`` bypasses transaction guarantees. All rows in a single call
are applied atomically, but there is no cross-call ordering. Prefer ``UPSERT``
via the Query service when transactional semantics matter.
Streaming Reads
---------------
read_table
^^^^^^^^^^
``read_table`` streams all rows (or a key range) of a table without a
transaction. It is efficient for full-table exports and ETL pipelines:
.. code-block:: python
def callee(session: ydb.table.Session):
with session.read_table("/local/users", columns=("id", "name")) as it:
for result_set in it:
for row in result_set.rows:
print(row.id, row.name)
driver.table_client.retry_operation_sync(callee)
``read_table`` is called on a ``Session`` object. Access a session through
``retry_operation_sync``, which handles session lifecycle and retries.
**Read a key range:**
.. code-block:: python
key_type = ydb.TupleType().add_element(ydb.PrimitiveType.Uint64)
key_range = ydb.KeyRange(
from_bound=ydb.KeyBound.inclusive([1], key_type),
to_bound=ydb.KeyBound.exclusive([1000], key_type),
)
def callee(session):
with session.read_table(
"/local/users",
key_range=key_range,
columns=("id", "name"),
ordered=True,
) as it:
for result_set in it:
for row in result_set.rows:
print(row.id, row.name)
driver.table_client.retry_operation_sync(callee)
:class:`~ydb.KeyBound`\ ``.inclusive([value], type)`` — the bound row is included.
:class:`~ydb.KeyBound`\ ``.exclusive([value], type)`` — the bound row is excluded.
Pass ``None`` to ``from_bound`` or ``to_bound`` of :class:`~ydb.KeyRange` to mean
"from the beginning" or "to the end" of the table.
**Parameters for read_table:**
.. list-table::
:header-rows: 1
:widths: 25 75
* - Parameter
- Description
* - ``path``
- Full path to the table.
* - ``key_range``
- ``KeyRange`` to read; ``None`` reads the whole table.
* - ``columns``
- Tuple of column names to return; empty tuple returns all columns.
* - ``ordered``
- If ``True``, rows are returned in primary-key order (slower).
* - ``row_limit``
- Maximum number of rows to return.
* - ``use_snapshot``
- ``True`` to read a consistent snapshot (default server behaviour).
scan_query
^^^^^^^^^^
``scan_query`` executes a YQL query in streaming mode — the server sends result
chunks as they are produced without buffering the entire result set:
.. code-block:: python
for result_set in driver.table_client.async_scan_query(
"SELECT id, name FROM users WHERE age > 25"
):
for row in result_set.rows:
print(row.id, row.name)
Unlike ``execute_with_retries`` from the Query service, ``scan_query`` does not
buffer results and cannot be used inside a transaction.
Pass parameters with a ``ScanQueryParameters`` object or a plain dict:
.. code-block:: python
for result_set in driver.table_client.async_scan_query(
"DECLARE $min_age AS Uint32; SELECT id, name FROM users WHERE age > $min_age",
parameters={"$min_age": (25, ydb.PrimitiveType.Uint32)},
):
for row in result_set.rows:
print(row.id, row.name)
Async Usage
-----------
The async table client is accessed the same way via ``driver.table_client``
on an async driver. All I/O methods become coroutines:
.. code-block:: python
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(
endpoint="grpc://localhost:2136", database="/local"
) as driver:
await driver.wait(timeout=5, fail_fast=True)
# Schema operations
await driver.table_client.create_table(
"/local/users",
ydb.TableDescription()
.with_columns(
ydb.Column("id", ydb.PrimitiveType.Uint64),
ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)),
)
.with_primary_keys("id"),
)
# Bulk upsert
column_types = (
ydb.BulkUpsertColumns()
.add_column("id", ydb.PrimitiveType.Uint64)
.add_column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
)
rows = [...]
await driver.table_client.bulk_upsert("/local/users", rows, column_types)
# Describe
entry = await driver.table_client.describe_table("/local/users")
for col in entry.columns:
print(col.name, col.type)
await driver.table_client.drop_table("/local/users")
asyncio.run(main())
Complete Example
----------------
Create a table with TTL, a secondary index, and custom partitioning; load data
with ``bulk_upsert``; stream it back with ``read_table``:
.. code-block:: python
import ydb
from collections import namedtuple
Event = namedtuple("Event", ["id", "user_id", "kind", "created_at"])
with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
driver.wait(timeout=5, fail_fast=True)
# Create table
driver.table_client.create_table(
"/local/events",
ydb.TableDescription()
.with_columns(
ydb.Column("id", ydb.PrimitiveType.Uint64),
ydb.Column("user_id", ydb.OptionalType(ydb.PrimitiveType.Uint64)),
ydb.Column("kind", ydb.OptionalType(ydb.PrimitiveType.Utf8)),
ydb.Column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp)),
)
.with_primary_keys("id")
.with_indexes(
ydb.TableIndex("user_idx").with_index_columns("user_id")
)
.with_ttl(
ydb.TtlSettings().with_date_type_column(
"created_at",
expire_after_seconds=30 * 24 * 3600,
)
)
.with_partitioning_settings(
ydb.PartitioningSettings()
.with_min_partitions_count(4)
.with_max_partitions_count(64)
.with_partitioning_by_load(ydb.FeatureFlag.ENABLED)
),
)
# Bulk load
import datetime
now = datetime.datetime.now(tz=datetime.timezone.utc)
column_types = (
ydb.BulkUpsertColumns()
.add_column("id", ydb.PrimitiveType.Uint64)
.add_column("user_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))
.add_column("kind", ydb.OptionalType(ydb.PrimitiveType.Utf8))
.add_column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
)
rows = [Event(i, i % 10, "click", now) for i in range(10000)]
driver.table_client.bulk_upsert("/local/events", rows, column_types)
# Stream all rows back
def read_all(session):
with session.read_table("/local/events", columns=("id", "user_id", "kind")) as it:
for result_set in it:
for row in result_set.rows:
print(row.id, row.user_id, row.kind)
driver.table_client.retry_operation_sync(read_all)
# Clean up
driver.table_client.drop_table("/local/events")
Coordination Service
====================
.. warning::
Coordination Service API is experimental and may contain bugs.
The interface may change in future releases.
The Coordination Service provides distributed primitives for managing shared state
across multiple processes or hosts:
* **Semaphores** — counting semaphores that work as distributed mutexes or
concurrency limiters. A semaphore with ``limit=1`` acts as an exclusive distributed lock.
* **Sessions** — persistent connections to a coordination node. The server tracks
session liveness; if a session drops without explicitly releasing a semaphore, the
server releases it automatically after the grace period expires.
A typical use case is leader election or limiting the number of concurrent workers
across a cluster.
Concepts
--------
**Coordination node** — a named entity in the YDB schema tree (like a table path)
that holds configuration and state for sessions and semaphores attached to it.
Think of it as a namespace for a set of semaphores.
**Session** — a stateful connection to a coordination node. Sessions survive
transient network failures through automatic reconnection. When a session ends
(gracefully or via timeout), all semaphores it holds are released automatically.
**Semaphore** — a server-side counter with a configurable ``limit``. A call to
``acquire(count=N)`` blocks until ``N`` units are available. ``acquire()`` with
the default ``count=1`` against a ``limit=1`` semaphore behaves as a mutex.
Node Management
---------------
Create a Node
^^^^^^^^^^^^^
A coordination node must exist before sessions can attach to it.
.. code-block:: python
# Minimal creation — default config
driver.coordination_client.create_node("/local/my_node")
Create a node with explicit configuration:
.. code-block:: python
import ydb.coordination
config = ydb.coordination.NodeConfig(
attach_consistency_mode=ydb.coordination.ConsistencyMode.STRICT,
read_consistency_mode=ydb.coordination.ConsistencyMode.STRICT,
rate_limiter_counters_mode=ydb.coordination.RateLimiterCountersMode.AGGREGATED,
self_check_period_millis=1000,
session_grace_period_millis=10000,
)
driver.coordination_client.create_node("/local/my_node", config)
:class:`~ydb.coordination.NodeConfig` parameters:
.. list-table::
:header-rows: 1
:widths: 35 65
* - Parameter
- Description
* - ``attach_consistency_mode``
- :class:`~ydb.coordination.ConsistencyMode` for session attach operations.
``STRICT`` — strong consistency; ``RELAXED`` — may read slightly stale state
but lower latency.
* - ``read_consistency_mode``
- :class:`~ydb.coordination.ConsistencyMode` for semaphore describe/watch operations.
* - ``rate_limiter_counters_mode``
- :class:`~ydb.coordination.RateLimiterCountersMode` for metrics collection:
``AGGREGATED`` (single counter per semaphore) or ``DETAILED`` (per-owner breakdown).
* - ``self_check_period_millis``
- How often (ms) the server checks its own liveness. Lower values detect
failures faster but increase server load.
* - ``session_grace_period_millis``
- How long (ms) the server keeps a session alive after losing contact with the
client before forcibly releasing all its semaphores.
Describe and Alter a Node
^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
# Read current config
config = driver.coordination_client.describe_node("/local/my_node")
print(config.attach_consistency_mode)
print(config.session_grace_period_millis)
# Change config
new_config = NodeConfig(
attach_consistency_mode=ConsistencyMode.RELAXED,
read_consistency_mode=ConsistencyMode.RELAXED,
rate_limiter_counters_mode=RateLimiterCountersMode.DETAILED,
self_check_period_millis=2000,
session_grace_period_millis=15000,
)
driver.coordination_client.alter_node("/local/my_node", new_config)
Delete a Node
^^^^^^^^^^^^^
.. code-block:: python
driver.coordination_client.delete_node("/local/my_node")
Sessions
--------
A session binds a client to a coordination node. All semaphore operations go through
a session. Sessions reconnect automatically after transient network errors.
.. code-block:: python
# Use as context manager — session is closed on exit
with driver.coordination_client.session("/local/my_node") as session:
# ... work with semaphores
pass
# Or manage lifecycle manually
session = driver.coordination_client.session("/local/my_node")
try:
# ... work with semaphores
pass
finally:
session.close()
If the process crashes or the network drops, the server waits
``session_grace_period_millis`` before releasing the session's semaphores. This window
gives the client time to reconnect and reclaim ownership.
Semaphores
----------
Basic Usage (Mutex Pattern)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
A semaphore with ``limit=1`` acts as an exclusive distributed lock:
.. code-block:: python
with driver.coordination_client.session("/local/my_node") as session:
# Acquire returns when no other holder exists (blocks otherwise)
semaphore = session.semaphore("my_lock")
semaphore.acquire()
try:
# critical section
do_work()
finally:
semaphore.release()
The context manager form is more concise:
.. code-block:: python
with driver.coordination_client.session("/local/my_node") as session:
with session.semaphore("my_lock"):
do_work() # lock is held for the duration of the block
Counting Semaphore
^^^^^^^^^^^^^^^^^^
Set ``limit > 1`` to allow multiple concurrent holders — useful for limiting
the number of parallel workers:
.. code-block:: python
with driver.coordination_client.session("/local/my_node") as session:
# Allow up to 3 concurrent holders
semaphore = session.semaphore("worker_slots", limit=3)
semaphore.acquire() # acquire 1 slot (default count=1)
try:
do_work()
finally:
semaphore.release()
# Acquire multiple slots at once
semaphore.acquire(count=2) # hold 2 of the 3 slots
Inspecting a Semaphore
^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
with driver.coordination_client.session("/local/my_node") as session:
semaphore = session.semaphore("my_lock")
info = semaphore.describe()
print("limit: ", info.limit) # max count
print("count: ", info.count) # currently acquired count
print("owners: ", info.owners) # list of current holders
print("waiters: ", info.waiters) # list of processes waiting to acquire
Attaching Metadata
^^^^^^^^^^^^^^^^^^
Each semaphore can hold an opaque ``bytes`` payload. This is useful for the leader
to publish its address or other metadata so that followers can discover it:
.. code-block:: python
with driver.coordination_client.session("/local/my_node") as session:
semaphore = session.semaphore("leader_lock")
semaphore.acquire()
# Publish this process's address as leader metadata
semaphore.update(b"host=worker-1.internal:8080")
Async Usage
-----------
The async client mirrors the synchronous API with ``async with`` and ``await``:
.. code-block:: python
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(
endpoint="grpc://localhost:2136",
database="/local",
) as driver:
await driver.wait(timeout=5, fail_fast=True)
await driver.coordination_client.create_node("/local/my_node")
async with driver.coordination_client.session("/local/my_node") as session:
async with session.semaphore("my_lock"):
await do_async_work()
asyncio.run(main())
Async is well suited for running multiple independent workers concurrently:
.. code-block:: python
async def worker(client, worker_id: int):
async with client.session("/local/my_node") as session:
for i in range(5):
async with session.semaphore("shared_resource"):
print(f"worker {worker_id}: step {i}")
await asyncio.sleep(0.1)
async def main():
async with ydb.aio.Driver(...) as driver:
await driver.wait(timeout=5, fail_fast=True)
await driver.coordination_client.create_node("/local/my_node")
await asyncio.gather(*(worker(driver.coordination_client, i) for i in range(4)))
Multi-threaded Sync Usage
--------------------------
The synchronous client is thread-safe. Multiple threads can share the same
``coordination_client`` and create independent sessions:
.. code-block:: python
import threading
import ydb
def worker(client, worker_id: int):
with client.session("/local/my_node") as session:
for i in range(5):
with session.semaphore("shared_resource"):
print(f"worker {worker_id}: step {i}")
with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
driver.wait(timeout=5, fail_fast=True)
driver.coordination_client.create_node("/local/my_node")
threads = [
threading.Thread(target=worker, args=(driver.coordination_client, i))
for i in range(4)
]
for t in threads:
t.start()
for t in threads:
t.join()
Leader Election Pattern
-----------------------
A common use case is electing a leader among a set of replicas. The replica that
acquires the semaphore becomes the leader; others block in ``acquire()`` and take
over if the leader's session expires:
.. code-block:: python
import socket
import ydb
def run_replica(driver: ydb.Driver):
my_address = socket.gethostname()
with driver.coordination_client.session("/local/election") as session:
semaphore = session.semaphore("leader", limit=1)
while True:
semaphore.acquire() # blocks until we become leader
semaphore.update(my_address.encode()) # publish our address
print(f"{my_address}: I am the leader")
try:
serve_as_leader() # run until an error or shutdown
except Exception:
pass
finally:
semaphore.release() # step down voluntarily
Scheme Service
==============
The Scheme service manages the YDB path hierarchy — databases, directories, tables,
topics, and other named objects. It is available through ``driver.scheme_client`` on
both the synchronous and asynchronous driver.
A typical use case is ensuring that a directory tree exists before creating tables,
or introspecting the schema at runtime.
Directories
-----------
Create a Directory
^^^^^^^^^^^^^^^^^^
Directories work like filesystem folders. They are used to organise tables and other
objects within a database:
.. code-block:: python
driver.scheme_client.make_directory("/local/my_app")
# Nested directories must be created one level at a time
driver.scheme_client.make_directory("/local/my_app/production")
driver.scheme_client.make_directory("/local/my_app/staging")
A helper to ensure an entire path exists, creating each missing segment:
.. code-block:: python
import os
import ydb
def ensure_path(driver: ydb.Driver, path: str) -> None:
parts = []
head = path.rstrip("/")
while head:
try:
if driver.scheme_client.describe_path(head).is_directory_or_database():
break
except ydb.SchemeError:
pass
parts.append(head)
head = os.path.dirname(head).rstrip("/")
for p in reversed(parts):
driver.scheme_client.make_directory(p)
Remove a Directory
^^^^^^^^^^^^^^^^^^
The directory must be empty before it can be removed:
.. code-block:: python
driver.scheme_client.remove_directory("/local/my_app/staging")
List a Directory
^^^^^^^^^^^^^^^^
``list_directory`` returns a :class:`~ydb.SchemeEntry` for each immediate child of the path,
plus a ``self`` entry describing the path itself:
.. code-block:: python
listing = driver.scheme_client.list_directory("/local/my_app")
# listing.self — SchemeEntry for /local/my_app itself
print(listing.self.name, listing.self.type)
# listing.children — list of SchemeEntry for immediate children
for entry in listing.children:
print(entry.name, entry.type)
Filter by type to find, for example, all tables in a directory:
.. code-block:: python
listing = driver.scheme_client.list_directory("/local/my_app")
tables = [e for e in listing.children if e.is_table()]
topics = [e for e in listing.children if e.type == ydb.SchemeEntryType.TOPIC]
Describing Paths
----------------
``describe_path`` returns a :class:`~ydb.SchemeEntry` for any path — table, topic, directory,
coordination node, etc.:
.. code-block:: python
entry = driver.scheme_client.describe_path("/local/my_app/users")
print(entry.name) # "users"
print(entry.owner) # owner login
print(entry.type) # SchemeEntryType.TABLE
print(entry.size_bytes) # approximate size
SchemeEntry
-----------
Every scheme object is described by a :class:`~ydb.SchemeEntry`:
.. list-table::
:header-rows: 1
:widths: 25 75
* - Field
- Description
* - ``name``
- Object name (last path component).
* - ``type``
- :class:`~ydb.SchemeEntryType` enum value.
* - ``owner``
- Login of the object owner.
* - ``size_bytes``
- Approximate on-disk size (tables only; 0 for directories).
* - ``permissions``
- Explicit ACL entries set on this object.
* - ``effective_permissions``
- Effective ACL entries (including inherited from parent).
**Convenience predicates:**
.. code-block:: python
entry.is_directory() # DIRECTORY
entry.is_database() # DATABASE
entry.is_table() # TABLE (row-oriented)
entry.is_column_table() # COLUMN_TABLE (column-oriented)
entry.is_column_store() # COLUMN_STORE
entry.is_any_table() # TABLE or COLUMN_TABLE
entry.is_directory_or_database()
entry.is_coordination_node() # COORDINATION_NODE
entry.is_external_table() # EXTERNAL_TABLE
entry.is_external_data_source() # EXTERNAL_DATA_SOURCE
entry.is_view() # VIEW
entry.is_resource_pool() # RESOURCE_POOL
entry.is_sysview() # SYS_VIEW
**SchemeEntryType enum values:**
.. list-table::
:header-rows: 1
:widths: 40 60
* - Value
- Meaning
* - ``SchemeEntryType.DIRECTORY``
- Directory
* - ``SchemeEntryType.DATABASE``
- Database root
* - ``SchemeEntryType.TABLE``
- Row-oriented table
* - ``SchemeEntryType.COLUMN_TABLE``
- Column-oriented table
* - ``SchemeEntryType.COLUMN_STORE``
- Column store (groups column tables)
* - ``SchemeEntryType.TOPIC``
- Topic (message queue)
* - ``SchemeEntryType.PERS_QUEUE_GROUP``
- Legacy PersQueue group (predecessor of Topic)
* - ``SchemeEntryType.COORDINATION_NODE``
- Coordination service node
* - ``SchemeEntryType.EXTERNAL_TABLE``
- External table (federated query)
* - ``SchemeEntryType.EXTERNAL_DATA_SOURCE``
- External data source (federated query)
* - ``SchemeEntryType.VIEW``
- View
* - ``SchemeEntryType.SEQUENCE``
- Sequence (auto-increment counter)
* - ``SchemeEntryType.REPLICATION``
- Async replication object
* - ``SchemeEntryType.TRANSFER``
- Data transfer object
* - ``SchemeEntryType.RESOURCE_POOL``
- Resource pool (workload management)
* - ``SchemeEntryType.SYS_VIEW``
- System view
* - ``SchemeEntryType.TYPE_UNSPECIFIED``
- Unknown or unsupported type
Async Usage
-----------
The async scheme client has the same methods with ``await``:
.. code-block:: python
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(
endpoint="grpc://localhost:2136", database="/local"
) as driver:
await driver.wait(timeout=5, fail_fast=True)
await driver.scheme_client.make_directory("/local/my_app")
listing = await driver.scheme_client.list_directory("/local")
for entry in listing.children:
print(entry.name, entry.type)
entry = await driver.scheme_client.describe_path("/local/my_app")
print(entry.is_directory())
await driver.scheme_client.remove_directory("/local/my_app")
asyncio.run(main())
Common Pattern: Idempotent Directory Setup
------------------------------------------
When initialising an application it is common to create the directory structure
once and tolerate the case where it already exists:
.. code-block:: python
import ydb
def setup_schema(driver: ydb.Driver, base_path: str) -> None:
for subdir in ("tables", "topics", "coordination"):
path = f"{base_path}/{subdir}"
try:
driver.scheme_client.make_directory(path)
except ydb.SchemeError:
pass # already exists
with ydb.QuerySessionPool(driver) as pool:
pool.execute_with_retries(
f"CREATE TABLE IF NOT EXISTS `{base_path}/tables/events` "
"(id Uint64, payload Utf8, PRIMARY KEY (id))"
)
OpenTelemetry Tracing
=====================
The SDK provides built-in distributed tracing via `OpenTelemetry `_.
When enabled, key YDB operations — such as session creation, query execution, transaction
commit/rollback, and driver initialization — produce OpenTelemetry spans. Trace
context is automatically propagated to the YDB server through gRPC metadata using the
`W3C Trace Context `_ standard.
Tracing is **zero-cost when disabled**: the SDK uses no-op stubs by default, so there is
no overhead unless you explicitly opt in.
Installation
------------
OpenTelemetry packages are not included by default. Install the SDK with the
``opentelemetry`` extra:
.. code-block:: sh
pip install ydb[opentelemetry]
This pulls in ``opentelemetry-api``. You will also need ``opentelemetry-sdk`` and an
exporter for your tracing backend, for example:
.. code-block:: sh
# OTLP/gRPC exporter (works with Jaeger, Tempo, and others)
pip install opentelemetry-exporter-otlp-proto-grpc
Enabling Tracing
----------------
Call ``enable_tracing()`` once, **after** configuring your OpenTelemetry tracer provider
and **before** creating a ``Driver``:
.. code-block:: python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
import ydb
from ydb.opentelemetry import enable_tracing
# 1. Set up OpenTelemetry
resource = Resource(attributes={"service.name": "my-service"})
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
trace.set_tracer_provider(provider)
# 2. Enable YDB tracing
enable_tracing()
# 3. Use the SDK as usual — spans are created automatically
with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
driver.wait(timeout=5)
with ydb.QuerySessionPool(driver) as pool:
pool.execute_with_retries("SELECT 1")
provider.shutdown()
``enable_tracing()`` accepts an optional ``tracer`` argument. If omitted, the SDK
obtains a tracer named ``"ydb.sdk"`` from the global tracer provider.
Repeated calls to ``enable_tracing()`` do nothing until you call ``disable_tracing()``,
which removes hooks so you can reconfigure or turn instrumentation off.
What Is Instrumented
--------------------
The following operations produce spans:
.. list-table::
:header-rows: 1
:widths: 35 20 45
* - Span Name
- Kind
- Description
* - ``ydb.Driver.Initialize``
- INTERNAL
- Driver wait / endpoint discovery.
* - ``ydb.CreateSession``
- CLIENT
- Creating a new query session.
* - ``ydb.ExecuteQuery``
- CLIENT
- Executing a query (including ``execute_with_retries``).
* - ``ydb.BeginTransaction``
- CLIENT
- Explicitly beginning a transaction via ``.begin()``.
* - ``ydb.Commit``
- CLIENT
- Committing an explicit transaction.
* - ``ydb.Rollback``
- CLIENT
- Rolling back a transaction.
* - ``ydb.RunWithRetry``
- INTERNAL
- Umbrella span wrapping the whole retryable block (``retry_operation_*`` / ``retry_tx_*`` / ``execute_with_retries``).
* - ``ydb.Try``
- INTERNAL
- A single retry attempt. From the **second** attempt onward carries
``ydb.retry.backoff_ms`` — how long the retrier slept before starting this
attempt (``0`` on the skip-yield retry path: ``Aborted`` / ``BadSession`` /
``NotFound`` / ``InternalError``, where the protocol prescribes immediate
retry without backoff). The very first ``ydb.Try`` omits the attribute
entirely because nothing preceded it.
All spans are nested under the currently active span, so wrapping your application
logic in a parent span produces a complete trace tree:
.. code-block:: python
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("handle-request"):
pool.execute_with_retries("SELECT 1")
# ↳ ydb.CreateSession (if a new session is needed)
# ↳ ydb.ExecuteQuery
Span Attributes
---------------
Every YDB RPC (CLIENT-kind) span carries these semantic attributes:
.. list-table::
:header-rows: 1
:widths: 30 70
* - Attribute
- Description
* - ``db.system.name``
- Always ``"ydb"``.
* - ``db.namespace``
- Database path (e.g. ``"/local"``).
* - ``server.address``
- Host from the connection string.
* - ``server.port``
- Port from the connection string.
* - ``network.peer.address``
- Actual node host from the discovery endpoint map (set once the session is attached to a node).
* - ``network.peer.port``
- Actual node port from the discovery endpoint map.
* - ``ydb.node.dc``
- Data-center / location reported by discovery for the node (e.g. ``"vla"``, ``"sas"``).
Additional attributes are set when available:
.. list-table::
:header-rows: 1
:widths: 30 70
* - Attribute
- Description
* - ``ydb.node.id``
- YDB node that handled the request.
On errors, the span also records:
- ``error.type`` — ``"ydb_error"``, ``"transport_error"``, or the Python exception class name.
- ``db.response.status_code`` — the YDB status code name (e.g. ``"SCHEME_ERROR"``).
Trace Context Propagation
-------------------------
When tracing is enabled, the SDK automatically injects trace context headers into
every gRPC call to YDB using the globally configured OpenTelemetry propagator
(``opentelemetry.propagate.inject``). By default, OpenTelemetry uses the
`W3C Trace Context `_ propagator, which adds
``traceparent`` and ``tracestate`` headers.
YDB server expects W3C Trace Context headers, so the default propagator configuration
works out of the box. This allows the server to correlate client spans with
server-side processing, enabling end-to-end trace visibility across the entire
request path.
Async Usage
-----------
Tracing works identically with the async driver. Call ``enable_tracing()`` once at
startup:
.. code-block:: python
import asyncio
import ydb
from ydb.opentelemetry import enable_tracing
enable_tracing()
async def main():
async with ydb.aio.Driver(
endpoint="grpc://localhost:2136",
database="/local",
) as driver:
await driver.wait(timeout=5)
async with ydb.aio.QuerySessionPool(driver) as pool:
await pool.execute_with_retries("SELECT 1")
asyncio.run(main())
Using a Custom Tracer
---------------------
To use a specific tracer instead of the global one:
.. code-block:: python
from opentelemetry import trace
my_tracer = trace.get_tracer("my.custom.tracer")
enable_tracing(tracer=my_tracer)
Running the Examples
--------------------
The runnable script is ``examples/opentelemetry/otel_example.py`` (bank table + concurrent
Serializable transactions and ``app_startup`` / ``example_tli`` application spans). **Start
Docker (YDB or the full stack) first**, then install and run on the host — see
``examples/opentelemetry/README.md`` for the full order of commands and environment variables.
**Full stack in one command** (YDB + OTLP + Tempo + Grafana; the ``otel-example`` service is built from ``examples/opentelemetry/Dockerfile`` and runs the script once):
.. code-block:: sh
docker compose -f examples/opentelemetry/compose-e2e.yaml up --build
The first run builds the ``otel-example`` image from the local SDK source; subsequent runs reuse the cached image. Pass ``--build`` again if you change the SDK or the demo script.
**Typical local run** (YDB in Docker, script on the host — Compose **before** ``pip`` / ``python``):
.. code-block:: sh
docker compose up -d
pip install -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt
python examples/opentelemetry/otel_example.py
Open `http://localhost:3000 `_ (Grafana) to explore traces via Tempo.
YDB Types
=========
YDB has its own type system. When passing parameters to queries you can let the SDK
infer the YDB type from the Python value automatically, or declare the type explicitly
when you need precise control.
Implicit Type Mapping
---------------------
For most common cases, pass a plain Python value and the SDK will pick the right type:
.. list-table::
:header-rows: 1
:widths: 30 30 40
* - Python value
- YDB type inferred
- Notes
* - ``bool``
- ``Bool``
-
* - ``int``
- ``Int64``
-
* - ``float``
- ``Double``
-
* - ``str``
- ``Utf8``
-
* - ``bytes``
- ``String``
-
* - ``dict``
- ``Json``
- serialised to JSON text
* - ``list`` / ``tuple``
- ``Json``
- serialised to JSON text
.. code-block:: python
pool.execute_with_retries(
"DECLARE $name AS Utf8; SELECT * FROM users WHERE name = $name",
parameters={"$name": "Alice"}, # str → Utf8
)
pool.execute_with_retries(
"DECLARE $active AS Bool; SELECT * FROM users WHERE active = $active",
parameters={"$active": True}, # bool → Bool
)
Explicit Types
--------------
When the automatic mapping is ambiguous or you need a type that cannot be inferred,
declare the type explicitly using one of three forms.
Tuple form ``(value, type)``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
parameters={
"$id": (42, ydb.PrimitiveType.Uint64),
}
``ydb.TypedValue``
^^^^^^^^^^^^^^^^^^
.. code-block:: python
parameters={
"$id": ydb.TypedValue(42, ydb.PrimitiveType.Uint64),
}
Both forms are equivalent. The tuple form is more concise; :class:`~ydb.TypedValue` is more
explicit and works well with type checkers.
PrimitiveType Reference
-----------------------
Integers
^^^^^^^^
.. list-table::
:header-rows: 1
:widths: 30 20 50
* - Type
- Range
- Notes
* - ``PrimitiveType.Bool``
- true / false
-
* - ``PrimitiveType.Int8``
- −128 … 127
-
* - ``PrimitiveType.Uint8``
- 0 … 255
-
* - ``PrimitiveType.Int16``
- −32 768 … 32 767
-
* - ``PrimitiveType.Uint16``
- 0 … 65 535
-
* - ``PrimitiveType.Int32``
- −2³¹ … 2³¹−1
-
* - ``PrimitiveType.Uint32``
- 0 … 2³²−1
-
* - ``PrimitiveType.Int64``
- −2⁶³ … 2⁶³−1
- default for ``int``
* - ``PrimitiveType.Uint64``
- 0 … 2⁶⁴−1
-
Floating point
^^^^^^^^^^^^^^
.. list-table::
:header-rows: 1
:widths: 30 70
* - Type
- Notes
* - ``PrimitiveType.Float``
- 32-bit IEEE 754
* - ``PrimitiveType.Double``
- 64-bit IEEE 754; default for Python ``float``
Text and binary
^^^^^^^^^^^^^^^
.. list-table::
:header-rows: 1
:widths: 30 70
* - Type
- Notes
* - ``PrimitiveType.Utf8``
- UTF-8 encoded text; default for Python ``str``
* - ``PrimitiveType.String``
- arbitrary bytes; default for Python ``bytes``
* - ``PrimitiveType.Json``
- JSON text stored as ``Utf8``
* - ``PrimitiveType.JsonDocument``
- JSON stored in a binary representation optimised for field access
* - ``PrimitiveType.Yson``
- YSON binary/text format (YDB-specific)
* - ``PrimitiveType.DyNumber``
- arbitrary-precision decimal stored as text
Date and time
^^^^^^^^^^^^^
.. list-table::
:header-rows: 1
:widths: 30 70
* - Type
- Notes
* - ``PrimitiveType.Date``
- calendar date; maps to/from ``datetime.date``
* - ``PrimitiveType.Datetime``
- date + time without timezone (second precision)
* - ``PrimitiveType.Timestamp``
- date + time without timezone (microsecond precision); maps to/from ``datetime.datetime``
* - ``PrimitiveType.Interval``
- duration; maps to/from ``datetime.timedelta``
* - ``PrimitiveType.Date32``
- extended-range date (supports negative years)
* - ``PrimitiveType.Datetime64``
- extended-range datetime (second precision)
* - ``PrimitiveType.Timestamp64``
- extended-range timestamp (microsecond precision)
* - ``PrimitiveType.Interval64``
- extended-range interval
Other
^^^^^
.. list-table::
:header-rows: 1
:widths: 30 70
* - Type
- Notes
* - ``PrimitiveType.UUID``
- UUID stored as two 64-bit integers; maps to/from ``uuid.UUID``
.. code-block:: python
import datetime
import ydb
pool.execute_with_retries(
"DECLARE $ts AS Timestamp; SELECT * FROM events WHERE created_at > $ts",
parameters={
"$ts": ydb.TypedValue(
datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc),
ydb.PrimitiveType.Timestamp,
)
},
)
Collection Types
----------------
OptionalType
^^^^^^^^^^^^
Wraps any type to allow ``None`` values. Use when a parameter or column is nullable:
.. code-block:: python
# Nullable Utf8 — pass None or a string
ydb.OptionalType(ydb.PrimitiveType.Utf8)
pool.execute_with_retries(
"DECLARE $nickname AS Utf8?; "
"INSERT INTO users (id, nickname) VALUES (1, $nickname)",
parameters={"$nickname": (None, ydb.OptionalType(ydb.PrimitiveType.Utf8))},
)
ListType
^^^^^^^^
A homogeneous ordered sequence:
.. code-block:: python
ydb.ListType(ydb.PrimitiveType.Int64)
pool.execute_with_retries(
"DECLARE $ids AS List; "
"SELECT * FROM users WHERE id IN $ids",
parameters={"$ids": ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64))},
)
DictType
^^^^^^^^
A map from a key type to a value type:
.. code-block:: python
# Dict
ydb.DictType(ydb.PrimitiveType.Utf8, ydb.PrimitiveType.Int64)
parameters={
"$scores": (
{"alice": 10, "bob": 20},
ydb.DictType(ydb.PrimitiveType.Utf8, ydb.PrimitiveType.Int64),
)
}
StructType
^^^^^^^^^^
A named record — useful for passing rows as parameters:
.. code-block:: python
row_type = (
ydb.StructType()
.add_member("id", ydb.PrimitiveType.Uint64)
.add_member("name", ydb.PrimitiveType.Utf8)
)
pool.execute_with_retries(
"DECLARE $row AS Struct; "
"INSERT INTO users (id, name) VALUES ($row.id, $row.name)",
parameters={
"$row": ({"id": 1, "name": "Alice"}, row_type)
},
)
Combine :class:`~ydb.ListType` + :class:`~ydb.StructType` for bulk inserts:
.. code-block:: python
row_type = (
ydb.StructType()
.add_member("id", ydb.PrimitiveType.Uint64)
.add_member("name", ydb.PrimitiveType.Utf8)
)
rows_type = ydb.ListType(row_type)
rows = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
]
pool.execute_with_retries(
"DECLARE $rows AS List>; "
"INSERT INTO users SELECT id, name FROM AS_TABLE($rows)",
parameters={"$rows": (rows, rows_type)},
)
TupleType
^^^^^^^^^
A positional (unnamed) record. Build it by chaining ``add_element``:
.. code-block:: python
point_type = (
ydb.TupleType()
.add_element(ydb.PrimitiveType.Double) # x
.add_element(ydb.PrimitiveType.Double) # y
)
parameters={"$point": ([1.0, 2.0], point_type)}
DecimalType
^^^^^^^^^^^
Fixed-precision decimal number:
.. code-block:: python
# Decimal(22, 9) — 22 total digits, 9 after the decimal point (default)
ydb.DecimalType()
# Custom precision
ydb.DecimalType(precision=10, scale=2)
Reading Values from Result Sets
--------------------------------
When you read from a result set, the SDK converts YDB values back to Python objects:
.. list-table::
:header-rows: 1
:widths: 30 70
* - YDB type
- Python value returned
* - ``Bool``
- ``bool``
* - ``Int*/Uint*``
- ``int``
* - ``Float`` / ``Double``
- ``float``
* - ``Utf8``, ``Json``, ``JsonDocument``
- ``str``
* - ``String``, ``Yson``
- ``bytes``
* - ``Date``
- ``datetime.date``
* - ``Datetime`` / ``Datetime64``
- ``datetime.datetime``
* - ``Timestamp`` / ``Timestamp64``
- ``datetime.datetime``
* - ``Interval`` / ``Interval64``
- ``datetime.timedelta``
* - ``UUID``
- ``str`` (formatted as ``xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx``)
* - ``Decimal``
- ``decimal.Decimal``
* - ``Optional``
- the inner value, or ``None``
.. code-block:: python
result_sets = pool.execute_with_retries(
"SELECT id, name, created_at, score FROM users WHERE id = 1"
)
row = result_sets[0].rows[0]
row["id"] # int
row["name"] # str (Utf8 column)
row["created_at"] # datetime.datetime (Timestamp column)
row["score"] # float (Double column)
.. note::
``String`` columns return ``bytes``, not ``str``. If you store text in a
``String`` column, decode it explicitly: ``row["col"].decode("utf-8")``.
Prefer ``Utf8`` for text data.
Errors and Retries
==================
The SDK maps every YDB server status code to a Python exception class. All exceptions
inherit from :class:`~ydb.Error`, so you can catch the whole hierarchy with a single
``except ydb.Error`` clause, or handle individual error types precisely.
Exception Hierarchy
-------------------
.. code-block:: text
ydb.Error
├── ydb.BadRequest — malformed query or invalid argument
├── ydb.Unauthorized — authentication succeeded but operation not permitted
├── ydb.Unauthenticated — authentication failed
├── ydb.InternalError — server-side internal error (usually transient)
├── ydb.Aborted — transaction aborted due to a conflict; safe to retry
├── ydb.Unavailable — service temporarily unavailable; safe to retry
├── ydb.Overloaded — server is overloaded; retry with backoff
├── ydb.SchemeError — schema-related error (e.g. table not found, already exists)
├── ydb.GenericError — unclassified server error
├── ydb.Timeout — server-side deadline exceeded
├── ydb.BadSession — session is invalid or expired; create a new one
├── ydb.PreconditionFailed — operation precondition not met
├── ydb.AlreadyExists — object already exists (DDL)
├── ydb.NotFound — object not found
├── ydb.SessionExpired — session TTL expired
├── ydb.Cancelled — operation was cancelled
├── ydb.Undetermined — outcome is unknown (e.g. network lost before ack)
├── ydb.Unsupported — operation not supported by this server version
├── ydb.SessionBusy — session is executing another request
├── ydb.ExternalError — error in an external data source
├── ydb.TruncatedResponseError — result set was truncated by the server
├── ydb.SessionPoolEmpty — all sessions are busy; pool exhausted
├── ydb.SessionPoolClosed — session pool has been stopped
├── ydb.ConnectionError — base class for transport-level errors
│ ├── ydb.ConnectionFailure — could not establish connection
│ ├── ydb.ConnectionLost — connection dropped mid-request
│ └── ydb.Unimplemented — server does not support this RPC
└── ydb.DeadlineExceed — client-side deadline exceeded
Catching Errors
---------------
.. code-block:: python
import ydb
try:
pool.execute_with_retries("SELECT * FROM users")
except ydb.SchemeError:
print("table does not exist")
except ydb.Unauthorized:
print("access denied")
except ydb.Unavailable:
print("service temporarily unavailable")
except ydb.Error as e:
print(f"other YDB error: {e}")
Each exception exposes:
* ``str(e)`` — human-readable message from the server.
* ``e.issues`` — list of structured ``IssueMessage`` objects with ``message``,
``issue_code``, and ``severity``.
* ``e.status`` — the ``ydb.StatusCode`` enum value.
.. code-block:: python
try:
pool.execute_with_retries("BAD QUERY")
except ydb.Error as e:
print(e.status) # e.g. StatusCode.BAD_REQUEST
print(e.message)
if e.issues:
for issue in e.issues:
print(issue.message, issue.severity)
Retriable vs Non-Retriable Errors
----------------------------------
Not every error is worth retrying. The SDK classifies errors into three groups:
**Always retriable (fast backoff):**
* :class:`~ydb.Unavailable` — service is temporarily unavailable.
* ``ClientInternalError`` — internal SDK error.
* ``SessionExpired`` — session TTL passed; the SDK opens a new one.
* :class:`~ydb.NotFound` — by default retried (configurable via ``retry_not_found``).
* ``Cancelled`` — only when ``retry_cancelled=True`` is set.
**Retriable with slow backoff:**
* :class:`~ydb.Aborted` — transaction conflict; the whole transaction must be replayed.
* :class:`~ydb.BadSession` — session is invalid; the SDK acquires a new one.
* :class:`~ydb.Overloaded` — server under heavy load; back off and try again.
* :class:`~ydb.SessionPoolEmpty` — all pool sessions busy; wait and retry.
* :class:`~ydb.ConnectionError` / :class:`~ydb.ConnectionLost` — network issues; reconnect and retry.
**Retriable only for idempotent operations (slow backoff):**
* :class:`~ydb.Undetermined` — outcome unknown; only safe to retry if the operation is idempotent
(i.e. repeating it has the same effect as running it once). Set ``idempotent=True``
in :class:`~ydb.RetrySettings` to enable.
**Never retried:**
* :class:`~ydb.BadRequest`, :class:`~ydb.Unauthorized`, ``Unauthenticated``,
:class:`~ydb.SchemeError`, :class:`~ydb.AlreadyExists`, ``Unsupported``,
:class:`~ydb.Timeout`, ``PreconditionFailed``, ``ExternalError`` — these indicate
a problem with the query, credentials, or schema that won't resolve by retrying.
RetrySettings
-------------
All pool methods that perform retries (``execute_with_retries``, ``retry_operation_sync``,
``retry_tx_sync``) accept an optional :class:`~ydb.RetrySettings` object:
.. code-block:: python
import ydb
retry = ydb.RetrySettings(
max_retries=5, # max retry attempts (default: 10)
idempotent=False, # set True to also retry Undetermined errors
retry_cancelled=False, # set True to retry Cancelled errors
)
pool.execute_with_retries("SELECT 1", retry_settings=retry)
BackoffSettings
^^^^^^^^^^^^^^^
:class:`~ydb.RetrySettings` uses two separate backoff curves:
* **fast backoff** — used for errors expected to clear quickly (:class:`~ydb.Unavailable`,
``SessionExpired``, etc.).
* **slow backoff** — used for errors where the server needs more breathing room
(:class:`~ydb.Overloaded`, :class:`~ydb.Aborted`, connection failures, etc.).
Both are instances of :class:`~ydb.BackoffSettings`:
.. code-block:: python
fast = ydb.BackoffSettings(
ceiling=10, # exponent cap: max wait slot = 2^ceiling * slot_duration
slot_duration=0.005, # base time unit in seconds (default: 5 ms)
uncertain_ratio=0.5, # fraction of the window that is randomised (jitter)
)
slow = ydb.BackoffSettings(
ceiling=6,
slot_duration=1.0, # 1 second base (default)
uncertain_ratio=0.5,
)
retry = ydb.RetrySettings(
max_retries=10,
fast_backoff_settings=fast,
slow_backoff_settings=slow,
)
The actual sleep duration for retry ``n`` is:
.. code-block:: text
slots = 2 ^ min(n, ceiling)
max_ms = slots * slot_duration * 1000
sleep = max_ms * (random() * uncertain_ratio + (1 - uncertain_ratio)) / 1000
Error Callback
^^^^^^^^^^^^^^
To log or instrument every retry attempt, pass ``on_ydb_error_callback``:
.. code-block:: python
import logging
logger = logging.getLogger(__name__)
def log_retry(err: ydb.Error):
logger.warning("YDB error, will retry: %s", err)
retry = ydb.RetrySettings(
max_retries=10,
on_ydb_error_callback=log_retry,
)
pool.execute_with_retries("SELECT 1", retry_settings=retry)
``@ydb_retry`` Decorator
------------------------
For functions that run outside a session pool, use the ``@ydb.ydb_retry`` decorator.
It wraps both synchronous and asynchronous functions:
.. code-block:: python
import ydb
@ydb.ydb_retry(max_retries=5, idempotent=True)
def fetch_user(driver: ydb.Driver, user_id: int):
with ydb.QuerySessionPool(driver) as pool:
result = pool.execute_with_retries(
"SELECT name FROM users WHERE id = $id",
parameters={"$id": user_id},
)
return result[0].rows[0]["name"]
# Async version — the decorator detects coroutines automatically:
@ydb.ydb_retry(max_retries=5, idempotent=True)
async def fetch_user_async(driver: ydb.aio.Driver, user_id: int):
async with ydb.aio.QuerySessionPool(driver) as pool:
result = await pool.execute_with_retries(
"SELECT name FROM users WHERE id = $id",
parameters={"$id": user_id},
)
return result[0].rows[0]["name"]
.. note::
The decorator retries the **entire function** on failure, not just the individual
query. Only use ``idempotent=True`` when repeating the full function body is safe.
Handling ``Undetermined``
--------------------------
:class:`~ydb.Undetermined` means the network was lost before the server could confirm whether
the operation succeeded or failed. The server may have applied the write — or not.
For **read-only** queries this is always safe to retry. For **writes**, only retry if
you can tolerate duplicates or the query is naturally idempotent (e.g. ``UPSERT``):
.. code-block:: python
retry = ydb.RetrySettings(
max_retries=10,
idempotent=True, # enables retry on Undetermined
)
# Safe: UPSERT is idempotent
pool.execute_with_retries(
"UPSERT INTO events (id, data) VALUES (42, 'payload')",
retry_settings=retry,
)
# Unsafe: INSERT will fail with AlreadyExists on the second attempt —
# but that error is not retriable, so at worst you get an exception.
pool.execute_with_retries(
"INSERT INTO events (id, data) VALUES (42, 'payload')",
retry_settings=retry,
)
Common Patterns
---------------
Fail fast on connection errors during startup:
.. code-block:: python
try:
driver.wait(timeout=5, fail_fast=True)
except TimeoutError:
raise SystemExit("Could not reach YDB — check endpoint and credentials")
Distinguish schema errors (fix the code) from transient errors (retry):
.. code-block:: python
try:
pool.execute_with_retries("SELECT * FROM nonexistent_table")
except ydb.SchemeError as e:
raise RuntimeError(f"Schema problem: {e}") from e
except ydb.Error:
pass # handled by retry logic inside execute_with_retries
YDB API Reference
=================
.. toctree::
:caption: Contents:
.. module:: ydb
Driver
------
DriverConfig
^^^^^^^^^^^^
.. autoclass:: ydb.DriverConfig
:members:
:inherited-members:
:undoc-members:
:exclude-members: database, ca_cert, channel_options, secure_channel, endpoint, endpoints, credentials, use_all_nodes, root_certificates, certificate_chain, private_key, grpc_keep_alive_timeout, table_client_settings, primary_user_agent
Driver
^^^^^^
.. autoclass:: ydb.Driver
:members:
:inherited-members:
:undoc-members:
Driver (AsyncIO)
^^^^^^^^^^^^^^^^
.. autoclass:: ydb.aio.Driver
:members:
:inherited-members:
:undoc-members:
------------------------
Credentials
-----------
AnonymousCredentials
^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.AnonymousCredentials
:members:
:undoc-members:
AccessTokenCredentials
^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.AccessTokenCredentials
:members:
:undoc-members:
StaticCredentials
^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.StaticCredentials
:members:
:undoc-members:
------------------------
Common
------
BaseRequestSettings
^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.BaseRequestSettings
:members:
:inherited-members:
:undoc-members:
:exclude-members: trace_id, request_type, timeout, cancel_after, operation_timeout, compression, need_rpc_auth, headers, make_copy, tracer
RetrySettings
^^^^^^^^^^^^^
.. autoclass:: ydb.RetrySettings
:members:
:inherited-members:
:undoc-members:
BackoffSettings
^^^^^^^^^^^^^^^
.. autoclass:: ydb.BackoffSettings
:members:
:undoc-members:
Result Sets
^^^^^^^^^^^
.. autoclass:: ydb.convert._ResultSet
:members:
:inherited-members:
:undoc-members:
------------------------
Types
-----
PrimitiveType
^^^^^^^^^^^^^
.. autoclass:: ydb.PrimitiveType
:members:
:undoc-members:
TypedValue
^^^^^^^^^^
.. autoclass:: ydb.TypedValue
:members:
:undoc-members:
OptionalType
^^^^^^^^^^^^
.. autoclass:: ydb.OptionalType
:members:
:undoc-members:
ListType
^^^^^^^^
.. autoclass:: ydb.ListType
:members:
:undoc-members:
DictType
^^^^^^^^
.. autoclass:: ydb.DictType
:members:
:undoc-members:
StructType
^^^^^^^^^^
.. autoclass:: ydb.StructType
:members:
:undoc-members:
TupleType
^^^^^^^^^
.. autoclass:: ydb.TupleType
:members:
:undoc-members:
DecimalType
^^^^^^^^^^^
.. autoclass:: ydb.DecimalType
:members:
:undoc-members:
------------------------
Query Service
-------------
QueryClientSettings
^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.QueryClientSettings
:members:
:inherited-members:
:undoc-members:
QuerySessionPool
^^^^^^^^^^^^^^^^
.. autoclass:: ydb.QuerySessionPool
:members:
:inherited-members:
:undoc-members:
QuerySession
^^^^^^^^^^^^
.. autoclass:: ydb.QuerySession
:members:
:inherited-members:
:undoc-members:
QueryTxContext
^^^^^^^^^^^^^^
.. autoclass:: ydb.QueryTxContext
:members:
:inherited-members:
:undoc-members:
QuerySessionPool (AsyncIO)
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.aio.QuerySessionPool
:members:
:inherited-members:
:undoc-members:
QuerySession (AsyncIO)
^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.aio.QuerySession
:members:
:inherited-members:
:undoc-members:
QueryTxContext (AsyncIO)
^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.aio.QueryTxContext
:members:
:inherited-members:
:undoc-members:
Transaction Modes
^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.BaseQueryTxMode
:members:
:inherited-members:
:undoc-members:
:exclude-members: name, to_proto
.. autoclass:: ydb.QuerySerializableReadWrite
:members:
:inherited-members:
:undoc-members:
:exclude-members: name, to_proto
.. autoclass:: ydb.QuerySnapshotReadOnly
:members:
:inherited-members:
:undoc-members:
:exclude-members: name, to_proto
.. autoclass:: ydb.QuerySnapshotReadWrite
:members:
:inherited-members:
:undoc-members:
:exclude-members: name, to_proto
.. autoclass:: ydb.QueryOnlineReadOnly
:members:
:inherited-members:
:undoc-members:
:exclude-members: name, to_proto
.. autoclass:: ydb.QueryStaleReadOnly
:members:
:inherited-members:
:undoc-members:
:exclude-members: name, to_proto
Query Settings
^^^^^^^^^^^^^^
.. autoclass:: ydb.QueryStatsMode
:members:
:undoc-members:
.. autoclass:: ydb.QueryResultSetFormat
:members:
:undoc-members:
.. autoclass:: ydb.QueryExplainResultFormat
:members:
:undoc-members:
.. autoclass:: ydb.ArrowFormatSettings
:members:
:undoc-members:
------------------------
Table Service
-------------
TableClient
^^^^^^^^^^^
.. autoclass:: ydb.TableClient
:members:
:inherited-members:
:undoc-members:
TableClientSettings
^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.TableClientSettings
:members:
:inherited-members:
:undoc-members:
Session Pool
^^^^^^^^^^^^
.. autoclass:: ydb.SessionPool
:members:
:inherited-members:
:undoc-members:
Session
^^^^^^^
.. autoclass:: ydb.Session
:members:
:inherited-members:
:undoc-members:
Transaction Context
^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.TxContext
:members:
:inherited-members:
:undoc-members:
DataQuery
^^^^^^^^^
.. autoclass:: ydb.DataQuery
:members:
:inherited-members:
:undoc-members:
TableDescription
^^^^^^^^^^^^^^^^
.. autoclass:: ydb.TableDescription
:members:
:undoc-members:
Column
^^^^^^
.. autoclass:: ydb.Column
:members:
:undoc-members:
TableIndex
^^^^^^^^^^
.. autoclass:: ydb.TableIndex
:members:
:undoc-members:
TableSchemeEntry
^^^^^^^^^^^^^^^^
.. autoclass:: ydb.TableSchemeEntry
:members:
:undoc-members:
DescribeTableSettings
^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.DescribeTableSettings
:members:
:undoc-members:
KeyRange
^^^^^^^^
.. autoclass:: ydb.KeyRange
:members:
:undoc-members:
KeyBound
^^^^^^^^
.. autoclass:: ydb.KeyBound
:members:
:undoc-members:
BulkUpsertColumns
^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.BulkUpsertColumns
:members:
:undoc-members:
TtlSettings
^^^^^^^^^^^
.. autoclass:: ydb.TtlSettings
:members:
:undoc-members:
PartitioningSettings
^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.PartitioningSettings
:members:
:undoc-members:
ColumnFamily
^^^^^^^^^^^^
.. autoclass:: ydb.ColumnFamily
:members:
:undoc-members:
FeatureFlag
^^^^^^^^^^^
.. autoclass:: ydb.FeatureFlag
:members:
:undoc-members:
ColumnUnit
^^^^^^^^^^
.. autoclass:: ydb.ColumnUnit
:members:
:undoc-members:
Compression
^^^^^^^^^^^
.. autoclass:: ydb.Compression
:members:
:undoc-members:
--------------------------
Topic Service
-------------
TopicClient
^^^^^^^^^^^
.. autoclass:: ydb.TopicClient
:members:
:inherited-members:
:undoc-members:
TopicClientSettings
^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.TopicClientSettings
:members:
:inherited-members:
:undoc-members:
TopicConsumer
^^^^^^^^^^^^^
.. autoclass:: ydb.TopicConsumer
:members:
:undoc-members:
TopicCodec
^^^^^^^^^^
.. autoclass:: ydb.TopicCodec
:members:
:undoc-members:
TopicWriterMessage
^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.TopicWriterMessage
:members:
:undoc-members:
TopicReaderSelector
^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.TopicReaderSelector
:members:
:undoc-members:
TopicAutoPartitioningSettings
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.TopicAutoPartitioningSettings
:members:
:undoc-members:
TopicAutoPartitioningStrategy
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.TopicAutoPartitioningStrategy
:members:
:undoc-members:
--------------------------
Coordination Service
--------------------
NodeConfig
^^^^^^^^^^
.. autoclass:: ydb.coordination.NodeConfig
:members:
:undoc-members:
ConsistencyMode
^^^^^^^^^^^^^^^
.. autoclass:: ydb.coordination.ConsistencyMode
:members:
:undoc-members:
RateLimiterCountersMode
^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: ydb.coordination.RateLimiterCountersMode
:members:
:undoc-members:
--------------------------
Scheme
------
SchemeClient
^^^^^^^^^^^^
.. autoclass:: ydb.SchemeClient
:members:
:inherited-members:
:undoc-members:
SchemeEntry
^^^^^^^^^^^
.. autoclass:: ydb.SchemeEntry
:members:
:undoc-members:
SchemeEntryType
^^^^^^^^^^^^^^^
.. autoclass:: ydb.SchemeEntryType
:members:
:undoc-members:
--------------------------
Errors
------
.. autoclass:: ydb.Error
:members:
:undoc-members:
.. autoclass:: ydb.BadRequest
:undoc-members:
.. autoclass:: ydb.Unauthorized
:undoc-members:
.. autoclass:: ydb.Unauthenticated
:undoc-members:
.. autoclass:: ydb.Aborted
:undoc-members:
.. autoclass:: ydb.Unavailable
:undoc-members:
.. autoclass:: ydb.Overloaded
:undoc-members:
.. autoclass:: ydb.SchemeError
:undoc-members:
.. autoclass:: ydb.GenericError
:undoc-members:
.. autoclass:: ydb.Timeout
:undoc-members:
.. autoclass:: ydb.BadSession
:undoc-members:
.. autoclass:: ydb.AlreadyExists
:undoc-members:
.. autoclass:: ydb.NotFound
:undoc-members:
.. autoclass:: ydb.Undetermined
:undoc-members:
.. autoclass:: ydb.ConnectionError
:undoc-members:
.. autoclass:: ydb.ConnectionFailure
:undoc-members:
.. autoclass:: ydb.ConnectionLost
:undoc-members:
.. autoclass:: ydb.SessionPoolEmpty
:undoc-members:
.. autoclass:: ydb.DeadlineExceed
:undoc-members: