Query Service
The Query service is the primary API for executing YQL queries in YDB.
It supports DDL (CREATE TABLE, DROP TABLE) and DML (SELECT, INSERT,
UPDATE, DELETE) in a single unified interface with full transaction support.
QuerySessionPool
QuerySessionPool manages a pool of server-side sessions and provides safe,
retriable execution of queries. This is the main entry point for the Query service.
Create a pool:
import ydb
with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
driver.wait(timeout=5, fail_fast=True)
with ydb.QuerySessionPool(driver) as pool:
# ... use pool
The pool is closed automatically when used as a context manager. Call pool.stop()
explicitly if not using with.
Async pool:
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
await driver.wait(timeout=5, fail_fast=True)
async with ydb.aio.QuerySessionPool(driver) as pool:
# ... use pool
execute_with_retries
The simplest way to run a query. Buffers all results into memory, retries on transient errors, and runs outside any transaction:
# DDL
pool.execute_with_retries("CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))")
# DML
pool.execute_with_retries(
"INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')"
)
# Query with results
result_sets = pool.execute_with_retries("SELECT id, name FROM users")
for row in result_sets[0].rows:
print(row["id"], row["name"])
Note
execute_with_retries loads the entire result set into memory before returning.
For large result sets use retry_operation_sync with streaming iteration instead.
Async:
result_sets = await pool.execute_with_retries("SELECT id, name FROM users")
Parameterized Queries
Always use parameters instead of string interpolation to avoid SQL injection and to allow the server to cache query plans.
Implicit types — Python values are mapped to YDB types automatically:
pool.execute_with_retries(
"DECLARE $id AS Uint64; SELECT * FROM users WHERE id = $id",
parameters={"$id": 42},
)
Explicit types via tuple — pass (value, ydb_type) when the automatic mapping
is ambiguous or you need a specific type:
pool.execute_with_retries(
"DECLARE $ids AS List<Int64>; SELECT * FROM users WHERE id IN $ids",
parameters={"$ids": ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64))},
)
Explicit types via TypedValue:
pool.execute_with_retries(
"DECLARE $id AS Int64; SELECT * FROM users WHERE id = $id",
parameters={"$id": ydb.TypedValue(42, ydb.PrimitiveType.Int64)},
)
Common PrimitiveType values: Bool, Int32, Int64, Uint32,
Uint64, Float, Double, String, Utf8, Json, Timestamp,
Date, Datetime.
retry_operation_sync
Use this when you need manual control over a session — for example, to stream large result sets or to execute multiple queries in sequence on the same session:
def callee(session: ydb.QuerySession):
with session.execute("SELECT COUNT(*) AS cnt FROM users") as results:
for result_set in results:
print(result_set.rows[0]["cnt"])
pool.retry_operation_sync(callee)
The pool acquires a session, calls callee(session), and retries the whole call on
retriable errors (e.g. UNAVAILABLE, ABORTED).
Async:
async def callee(session: ydb.aio.QuerySession):
async with await session.execute("SELECT COUNT(*) AS cnt FROM users") as results:
async for result_set in results:
print(result_set.rows[0]["cnt"])
await pool.retry_operation_async(callee)
retry_tx_sync
Use this to execute a group of queries atomically within a transaction. The pool handles session acquisition, transaction begin, commit, and retries:
def callee(tx: ydb.QueryTxContext):
with tx.execute("INSERT INTO users (id, name) VALUES (3, 'Carol')"):
pass
pool.retry_tx_sync(callee)
If callee raises a retriable error the whole function (including the transaction)
is re-executed from scratch — make sure callee is idempotent or relies on the
transaction’s atomicity for correctness.
Async:
async def callee(tx: ydb.aio.QueryTxContext):
async with await tx.execute("INSERT INTO users (id, name) VALUES (3, 'Carol')"):
pass
await pool.retry_tx_async(callee)
With an explicit transaction mode:
pool.retry_tx_sync(callee, tx_mode=ydb.QuerySnapshotReadOnly())
Transactions
Transaction Modes
Mode |
Description |
|---|---|
Full ACID serializable isolation. Default. Supports reads and writes. |
|
Consistent read-only snapshot taken at transaction start. |
|
Snapshot reads with write support; write conflicts are possible. |
|
Each read returns the most recent data at execution time. No cross-read consistency. |
|
|
Fastest reads; even individual reads may be slightly inconsistent. |
Reads may lag by fractions of a second. Each individual read is consistent. |
Manual Transaction Control
Use session.transaction() when you need fine-grained control:
Synchronous:
def callee(session: ydb.QuerySession):
with session.transaction() as tx:
tx.begin()
with tx.execute("INSERT INTO users (id, name) VALUES (4, 'Dave')"):
pass
with tx.execute("SELECT COUNT(*) AS cnt FROM users") as results:
for result_set in results:
print("count:", result_set.rows[0]["cnt"])
tx.commit()
# tx.rollback() to discard changes instead
pool.retry_operation_sync(callee)
Commit on last query — avoids an extra round-trip:
def callee(session: ydb.QuerySession):
with session.transaction() as tx:
tx.begin()
with tx.execute(
"INSERT INTO users (id, name) VALUES (5, 'Eve')",
commit_tx=True,
):
pass
pool.retry_operation_sync(callee)
Async:
async def callee(session: ydb.aio.QuerySession):
async with session.transaction() as tx:
await tx.begin()
async with await tx.execute(
"INSERT INTO users (id, name) VALUES (4, 'Dave')"
):
pass
async with await tx.execute(
"SELECT COUNT(*) AS cnt FROM users"
) as results:
async for result_set in results:
print("count:", result_set.rows[0]["cnt"])
await tx.commit()
await pool.retry_operation_async(callee)
Note
The transaction context manager calls rollback() automatically on exit if you
did not commit explicitly.
Working with Result Sets
session.execute() and tx.execute() return an iterator of ResultSet objects.
A single query may return multiple result sets (e.g. when it contains multiple SELECT
statements).
Synchronous iteration:
with session.execute("SELECT id, name FROM users ORDER BY id") as results:
for result_set in results:
for row in result_set.rows:
print(row["id"], row["name"].decode())
Async iteration:
async with await session.execute("SELECT id, name FROM users") as results:
async for result_set in results:
for row in result_set.rows:
print(row["id"], row["name"].decode())
ResultSet fields:
result_set.rows # list of Row objects — access columns by name: row["col"]
result_set.columns # list of column descriptors with .name and .type
result_set.truncated # True if the server truncated results (use pagination)
Note
String columns return bytes; Utf8 columns return str.
Decode as needed: row["name"].decode("utf-8").
DDL Queries
DDL is executed the same way as DML — through the pool or a session:
pool.execute_with_retries(
"""
CREATE TABLE orders (
order_id Uint64,
user_id Uint64,
amount Double,
created Timestamp,
PRIMARY KEY (order_id)
)
"""
)
pool.execute_with_retries("DROP TABLE IF EXISTS orders")
Complete Examples
Synchronous
import ydb
with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
driver.wait(timeout=5, fail_fast=True)
with ydb.QuerySessionPool(driver) as pool:
# Schema
pool.execute_with_retries("DROP TABLE IF EXISTS users")
pool.execute_with_retries(
"CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))"
)
# Insert in a transaction
def insert(tx: ydb.QueryTxContext):
with tx.execute(
"INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')",
commit_tx=True,
):
pass
pool.retry_tx_sync(insert)
# Read
result_sets = pool.execute_with_retries("SELECT id, name FROM users")
for row in result_sets[0].rows:
print(row["id"], row["name"])
Asynchronous
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(
endpoint="grpc://localhost:2136", database="/local"
) as driver:
await driver.wait(timeout=5, fail_fast=True)
async with ydb.aio.QuerySessionPool(driver) as pool:
# Schema
await pool.execute_with_retries("DROP TABLE IF EXISTS users")
await pool.execute_with_retries(
"CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))"
)
# Insert in a transaction
async def insert(tx: ydb.aio.QueryTxContext):
async with await tx.execute(
"INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')",
commit_tx=True,
):
pass
await pool.retry_tx_async(insert)
# Read
result_sets = await pool.execute_with_retries(
"SELECT id, name FROM users"
)
for row in result_sets[0].rows:
print(row["id"], row["name"])
asyncio.run(main())
Apache Arrow Format
By default query results are returned as YDB value objects. For analytics workloads
you can request results in Apache Arrow IPC format,
which integrates directly with pandas, polars, and other columnar tools. Requires
pyarrow to be installed (pip install pyarrow).
import pyarrow as pa
import ydb
result_sets = pool.execute_with_retries(
"SELECT id, name, score FROM users ORDER BY id LIMIT 1000",
result_set_format=ydb.QueryResultSetFormat.ARROW,
)
for result_set in result_sets:
schema = pa.ipc.read_schema(pa.py_buffer(result_set.arrow_format_meta.schema))
batch = pa.ipc.read_record_batch(pa.py_buffer(result_set.data), schema)
df = batch.to_pandas()
print(df.head())
Arrow results can also be compressed on the wire. Pass arrow_format_settings to
choose a codec:
settings = ydb.ArrowFormatSettings(
compression_codec=ydb.ArrowCompressionCodec(
ydb.ArrowCompressionCodecType.ZSTD,
level=10,
)
)
result_sets = pool.execute_with_retries(
"SELECT * FROM events LIMIT 100000",
result_set_format=ydb.QueryResultSetFormat.ARROW,
arrow_format_settings=settings,
)
Note
Arrow format is only useful when you process results with a columnar library.
For regular row-by-row processing use the default VALUE format — it has
no extra dependencies and lower overhead for small result sets.
Query Stats and Explain
Query Stats
YDB can return execution statistics alongside query results. Pass a QueryStatsMode
value as stats_mode to session.execute() or tx.execute(), then read
last_query_stats after the iterator is consumed:
def callee(session: ydb.QuerySession):
with session.execute(
"SELECT * FROM users",
stats_mode=ydb.QueryStatsMode.BASIC,
):
pass # must iterate to completion for stats to be populated
print(session.last_query_stats)
pool.retry_operation_sync(callee)
The same works inside a transaction — stats reflect the last executed statement:
def callee(tx: ydb.QueryTxContext):
with tx.execute(
"SELECT COUNT(*) AS cnt FROM users",
stats_mode=ydb.QueryStatsMode.FULL,
) as results:
for result_set in results:
print(result_set.rows[0]["cnt"])
print(tx.last_query_stats)
pool.retry_tx_sync(callee)
Stats modes:
|
What is returned |
|---|---|
|
No statistics (default). |
|
Row counts and execution time per stage. |
|
Full per-operator statistics. |
|
Full statistics plus the query execution plan. Use this for performance investigations. |
Explain
explain returns the query execution plan without actually running the query.
Use it to understand how YDB will execute a statement before sending it to
production:
# Returns a JSON string
plan = pool.explain_with_retries("SELECT * FROM users WHERE id = 1")
print(plan)
# Returns a parsed dict
plan_dict = pool.explain_with_retries(
"SELECT * FROM users WHERE id = 1",
result_format=ydb.QueryExplainResultFormat.DICT,
)
print(plan_dict["Plan"])
You can also call explain directly on a session:
def callee(session: ydb.QuerySession):
plan = session.explain("SELECT * FROM users WHERE id = 1")
print(plan)
pool.retry_operation_sync(callee)