Query Service

The Query service is the primary API for executing YQL queries in YDB. It supports DDL (CREATE TABLE, DROP TABLE) and DML (SELECT, INSERT, UPDATE, DELETE) in a single unified interface with full transaction support.

QuerySessionPool

QuerySessionPool manages a pool of server-side sessions and provides safe, retriable execution of queries. This is the main entry point for the Query service.

Create a pool:

import ydb

with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
    driver.wait(timeout=5, fail_fast=True)

    with ydb.QuerySessionPool(driver) as pool:
        # ... use pool

The pool is closed automatically when used as a context manager. Call pool.stop() explicitly if not using with.

Async pool:

import asyncio
import ydb

async def main():
    async with ydb.aio.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
        await driver.wait(timeout=5, fail_fast=True)

        async with ydb.aio.QuerySessionPool(driver) as pool:
            # ... use pool

execute_with_retries

The simplest way to run a query. Buffers all results into memory, retries on transient errors, and runs outside any transaction:

# DDL
pool.execute_with_retries("CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))")

# DML
pool.execute_with_retries(
    "INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')"
)

# Query with results
result_sets = pool.execute_with_retries("SELECT id, name FROM users")
for row in result_sets[0].rows:
    print(row["id"], row["name"])

Note

execute_with_retries loads the entire result set into memory before returning. For large result sets use retry_operation_sync with streaming iteration instead.

Async:

result_sets = await pool.execute_with_retries("SELECT id, name FROM users")

Parameterized Queries

Always use parameters instead of string interpolation to avoid SQL injection and to allow the server to cache query plans.

Implicit types — Python values are mapped to YDB types automatically:

pool.execute_with_retries(
    "DECLARE $id AS Uint64; SELECT * FROM users WHERE id = $id",
    parameters={"$id": 42},
)

Explicit types via tuple — pass (value, ydb_type) when the automatic mapping is ambiguous or you need a specific type:

pool.execute_with_retries(
    "DECLARE $ids AS List<Int64>; SELECT * FROM users WHERE id IN $ids",
    parameters={"$ids": ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64))},
)

Explicit types via TypedValue:

pool.execute_with_retries(
    "DECLARE $id AS Int64; SELECT * FROM users WHERE id = $id",
    parameters={"$id": ydb.TypedValue(42, ydb.PrimitiveType.Int64)},
)

Common PrimitiveType values: Bool, Int32, Int64, Uint32, Uint64, Float, Double, String, Utf8, Json, Timestamp, Date, Datetime.

retry_operation_sync

Use this when you need manual control over a session — for example, to stream large result sets or to execute multiple queries in sequence on the same session:

def callee(session: ydb.QuerySession):
    with session.execute("SELECT COUNT(*) AS cnt FROM users") as results:
        for result_set in results:
            print(result_set.rows[0]["cnt"])

pool.retry_operation_sync(callee)

The pool acquires a session, calls callee(session), and retries the whole call on retriable errors (e.g. UNAVAILABLE, ABORTED).

Async:

async def callee(session: ydb.aio.QuerySession):
    async with await session.execute("SELECT COUNT(*) AS cnt FROM users") as results:
        async for result_set in results:
            print(result_set.rows[0]["cnt"])

await pool.retry_operation_async(callee)

retry_tx_sync

Use this to execute a group of queries atomically within a transaction. The pool handles session acquisition, transaction begin, commit, and retries:

def callee(tx: ydb.QueryTxContext):
    with tx.execute("INSERT INTO users (id, name) VALUES (3, 'Carol')"):
        pass

pool.retry_tx_sync(callee)

If callee raises a retriable error the whole function (including the transaction) is re-executed from scratch — make sure callee is idempotent or relies on the transaction’s atomicity for correctness.

Async:

async def callee(tx: ydb.aio.QueryTxContext):
    async with await tx.execute("INSERT INTO users (id, name) VALUES (3, 'Carol')"):
        pass

await pool.retry_tx_async(callee)

With an explicit transaction mode:

pool.retry_tx_sync(callee, tx_mode=ydb.QuerySnapshotReadOnly())

Transactions

Transaction Modes

Mode

Description

QuerySerializableReadWrite

Full ACID serializable isolation. Default. Supports reads and writes.

QuerySnapshotReadOnly

Consistent read-only snapshot taken at transaction start.

QuerySnapshotReadWrite

Snapshot reads with write support; write conflicts are possible.

QueryOnlineReadOnly

Each read returns the most recent data at execution time. No cross-read consistency.

QueryOnlineReadOnly (allow_inconsistent_reads=True)

Fastest reads; even individual reads may be slightly inconsistent.

QueryStaleReadOnly

Reads may lag by fractions of a second. Each individual read is consistent.

Manual Transaction Control

Use session.transaction() when you need fine-grained control:

Synchronous:

def callee(session: ydb.QuerySession):
    with session.transaction() as tx:
        tx.begin()

        with tx.execute("INSERT INTO users (id, name) VALUES (4, 'Dave')"):
            pass

        with tx.execute("SELECT COUNT(*) AS cnt FROM users") as results:
            for result_set in results:
                print("count:", result_set.rows[0]["cnt"])

        tx.commit()
        # tx.rollback() to discard changes instead

pool.retry_operation_sync(callee)

Commit on last query — avoids an extra round-trip:

def callee(session: ydb.QuerySession):
    with session.transaction() as tx:
        tx.begin()

        with tx.execute(
            "INSERT INTO users (id, name) VALUES (5, 'Eve')",
            commit_tx=True,
        ):
            pass

pool.retry_operation_sync(callee)

Async:

async def callee(session: ydb.aio.QuerySession):
    async with session.transaction() as tx:
        await tx.begin()

        async with await tx.execute(
            "INSERT INTO users (id, name) VALUES (4, 'Dave')"
        ):
            pass

        async with await tx.execute(
            "SELECT COUNT(*) AS cnt FROM users"
        ) as results:
            async for result_set in results:
                print("count:", result_set.rows[0]["cnt"])

        await tx.commit()

await pool.retry_operation_async(callee)

Note

The transaction context manager calls rollback() automatically on exit if you did not commit explicitly.

Working with Result Sets

session.execute() and tx.execute() return an iterator of ResultSet objects. A single query may return multiple result sets (e.g. when it contains multiple SELECT statements).

Synchronous iteration:

with session.execute("SELECT id, name FROM users ORDER BY id") as results:
    for result_set in results:
        for row in result_set.rows:
            print(row["id"], row["name"].decode())

Async iteration:

async with await session.execute("SELECT id, name FROM users") as results:
    async for result_set in results:
        for row in result_set.rows:
            print(row["id"], row["name"].decode())

ResultSet fields:

result_set.rows      # list of Row objects — access columns by name: row["col"]
result_set.columns   # list of column descriptors with .name and .type
result_set.truncated # True if the server truncated results (use pagination)

Note

String columns return bytes; Utf8 columns return str. Decode as needed: row["name"].decode("utf-8").

DDL Queries

DDL is executed the same way as DML — through the pool or a session:

pool.execute_with_retries(
    """
    CREATE TABLE orders (
        order_id Uint64,
        user_id  Uint64,
        amount   Double,
        created  Timestamp,
        PRIMARY KEY (order_id)
    )
    """
)

pool.execute_with_retries("DROP TABLE IF EXISTS orders")

Complete Examples

Synchronous

import ydb

with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
    driver.wait(timeout=5, fail_fast=True)

    with ydb.QuerySessionPool(driver) as pool:

        # Schema
        pool.execute_with_retries("DROP TABLE IF EXISTS users")
        pool.execute_with_retries(
            "CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))"
        )

        # Insert in a transaction
        def insert(tx: ydb.QueryTxContext):
            with tx.execute(
                "INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')",
                commit_tx=True,
            ):
                pass

        pool.retry_tx_sync(insert)

        # Read
        result_sets = pool.execute_with_retries("SELECT id, name FROM users")
        for row in result_sets[0].rows:
            print(row["id"], row["name"])

Asynchronous

import asyncio
import ydb

async def main():
    async with ydb.aio.Driver(
        endpoint="grpc://localhost:2136", database="/local"
    ) as driver:
        await driver.wait(timeout=5, fail_fast=True)

        async with ydb.aio.QuerySessionPool(driver) as pool:

            # Schema
            await pool.execute_with_retries("DROP TABLE IF EXISTS users")
            await pool.execute_with_retries(
                "CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))"
            )

            # Insert in a transaction
            async def insert(tx: ydb.aio.QueryTxContext):
                async with await tx.execute(
                    "INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')",
                    commit_tx=True,
                ):
                    pass

            await pool.retry_tx_async(insert)

            # Read
            result_sets = await pool.execute_with_retries(
                "SELECT id, name FROM users"
            )
            for row in result_sets[0].rows:
                print(row["id"], row["name"])

asyncio.run(main())

Apache Arrow Format

By default query results are returned as YDB value objects. For analytics workloads you can request results in Apache Arrow IPC format, which integrates directly with pandas, polars, and other columnar tools. Requires pyarrow to be installed (pip install pyarrow).

import pyarrow as pa
import ydb

result_sets = pool.execute_with_retries(
    "SELECT id, name, score FROM users ORDER BY id LIMIT 1000",
    result_set_format=ydb.QueryResultSetFormat.ARROW,
)

for result_set in result_sets:
    schema = pa.ipc.read_schema(pa.py_buffer(result_set.arrow_format_meta.schema))
    batch  = pa.ipc.read_record_batch(pa.py_buffer(result_set.data), schema)
    df = batch.to_pandas()
    print(df.head())

Arrow results can also be compressed on the wire. Pass arrow_format_settings to choose a codec:

settings = ydb.ArrowFormatSettings(
    compression_codec=ydb.ArrowCompressionCodec(
        ydb.ArrowCompressionCodecType.ZSTD,
        level=10,
    )
)

result_sets = pool.execute_with_retries(
    "SELECT * FROM events LIMIT 100000",
    result_set_format=ydb.QueryResultSetFormat.ARROW,
    arrow_format_settings=settings,
)

Note

Arrow format is only useful when you process results with a columnar library. For regular row-by-row processing use the default VALUE format — it has no extra dependencies and lower overhead for small result sets.

Query Stats and Explain

Query Stats

YDB can return execution statistics alongside query results. Pass a QueryStatsMode value as stats_mode to session.execute() or tx.execute(), then read last_query_stats after the iterator is consumed:

def callee(session: ydb.QuerySession):
    with session.execute(
        "SELECT * FROM users",
        stats_mode=ydb.QueryStatsMode.BASIC,
    ):
        pass  # must iterate to completion for stats to be populated

    print(session.last_query_stats)

pool.retry_operation_sync(callee)

The same works inside a transaction — stats reflect the last executed statement:

def callee(tx: ydb.QueryTxContext):
    with tx.execute(
        "SELECT COUNT(*) AS cnt FROM users",
        stats_mode=ydb.QueryStatsMode.FULL,
    ) as results:
        for result_set in results:
            print(result_set.rows[0]["cnt"])

    print(tx.last_query_stats)

pool.retry_tx_sync(callee)

Stats modes:

QueryStatsMode value

What is returned

QueryStatsMode.NONE

No statistics (default).

QueryStatsMode.BASIC

Row counts and execution time per stage.

QueryStatsMode.FULL

Full per-operator statistics.

QueryStatsMode.PROFILE

Full statistics plus the query execution plan. Use this for performance investigations.

Explain

explain returns the query execution plan without actually running the query. Use it to understand how YDB will execute a statement before sending it to production:

# Returns a JSON string
plan = pool.explain_with_retries("SELECT * FROM users WHERE id = 1")
print(plan)

# Returns a parsed dict
plan_dict = pool.explain_with_retries(
    "SELECT * FROM users WHERE id = 1",
    result_format=ydb.QueryExplainResultFormat.DICT,
)
print(plan_dict["Plan"])

You can also call explain directly on a session:

def callee(session: ydb.QuerySession):
    plan = session.explain("SELECT * FROM users WHERE id = 1")
    print(plan)

pool.retry_operation_sync(callee)