Topic Service
YDB Topic Service is a persistent message queue that supports multiple producers and consumers, horizontal scaling via partitioning, and at-least-once delivery semantics.
The Python SDK provides both synchronous and asynchronous clients with identical APIs.
Concepts
Topic — a named stream of messages divided into one or more partitions.
Partition — an ordered, append-only log. Messages within a partition are delivered in order.
Producer — a client that writes messages to a topic. Each producer has a producer_id
that is used to deduplicate messages via sequence numbers (seqno).
Consumer — a named group of readers that share offset state. Each consumer independently tracks its position in every partition, so multiple consumer groups can read the same topic without interfering with each other.
Getting a Topic Client
The topic client is available on every driver instance via the topic_client property.
Synchronous:
import ydb
with ydb.Driver(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.credentials_from_env_variables(),
) as driver:
driver.wait(timeout=5, fail_fast=True)
topic_client = driver.topic_client
Asynchronous:
import asyncio
import ydb
async def main():
async with ydb.aio.Driver(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.credentials_from_env_variables(),
) as driver:
await driver.wait(timeout=5, fail_fast=True)
topic_client = driver.topic_client
Topic Management
Create a Topic
# Synchronous
driver.topic_client.create_topic(
"/local/my-topic",
min_active_partitions=1,
max_active_partitions=10,
retention_period=datetime.timedelta(hours=24),
consumers=["my-consumer"],
)
# Asynchronous
await driver.topic_client.create_topic(
"/local/my-topic",
min_active_partitions=1,
max_active_partitions=10,
retention_period=datetime.timedelta(hours=24),
consumers=["my-consumer"],
)
Key parameters for create_topic:
path— full topic path including database prefix.min_active_partitions— minimum number of active partitions (default: 1).max_active_partitions— maximum number of partitions when auto-scaling is enabled.retention_period— how long messages are kept (datetime.timedelta).retention_storage_mb— maximum storage size per partition in megabytes.supported_codecs— list ofTopicCodecvalues the topic accepts (default: RAW and GZIP).consumers— list of consumer names (strings) orTopicConsumerobjects to create upfront.partition_write_speed_bytes_per_second— per-partition write throughput limit.
Alter a Topic
# Add a consumer
driver.topic_client.alter_topic(
"/local/my-topic",
add_consumers=["new-consumer"],
)
# Change retention
driver.topic_client.alter_topic(
"/local/my-topic",
set_retention_period=datetime.timedelta(hours=48),
)
Drop a Topic
driver.topic_client.drop_topic("/local/my-topic")
Describe a Topic
description = driver.topic_client.describe_topic("/local/my-topic")
print(description.partitions)
print(description.consumers)
# Include partition-level statistics
description = driver.topic_client.describe_topic(
"/local/my-topic",
include_stats=True,
)
Writing Messages
Create a Writer
Use topic_client.writer() as a context manager (recommended) or create it manually.
Synchronous:
with driver.topic_client.writer("/local/my-topic") as writer:
writer.write("hello")
Asynchronous:
async with driver.topic_client.writer("/local/my-topic") as writer:
await writer.write("hello")
Without context manager (you must call close() yourself):
writer = driver.topic_client.writer("/local/my-topic")
try:
writer.write("hello")
finally:
writer.close()
Writer Parameters
writer = driver.topic_client.writer(
"/local/my-topic",
producer_id="my-producer", # Unique producer ID; auto-generated UUID if omitted.
partition_id=0, # Pin to a specific partition; None = auto-select.
codec=ydb.TopicCodec.GZIP, # Compress messages. Default: RAW (no compression).
auto_seqno=True, # Auto-increment sequence numbers (default: True).
auto_created_at=True, # Auto-set message timestamps (default: True).
)
producer_idis used for deduplication: if the same(producer_id, seqno)pair is received again, YDB silently skips it.When
auto_seqno=Truethe SDK assigns monotonically increasing sequence numbers. Setauto_seqno=Falseand provideseqnomanually when you need deterministic IDs.
Sending Messages
Simple write — non-blocking; buffers the message internally and returns immediately:
writer.write("text message")
writer.write(b"\x01\x02\x03") # bytes
writer.write(["msg-1", "msg-2"]) # send multiple messages in one call
Full message form with optional fields:
writer.write(ydb.TopicWriterMessage(
data="hello",
seqno=42, # omit when auto_seqno=True
created_at=datetime.datetime.now(), # omit when auto_created_at=True
metadata_items={"key": "value"},
))
Write and wait for server acknowledgment:
# Blocks until the server confirms the write.
result = writer.write_with_ack("important message")
# Get a Future and wait for it later (synchronous client).
future = writer.async_write_with_ack("important message")
future.result() # blocks here
# Async client — await directly.
result = await writer.write_with_ack("important message")
Flush — wait for all previously buffered messages to be acknowledged:
for i in range(100):
writer.write(f"message-{i}")
writer.flush() # blocks until all 100 messages are acked
# Async version:
await writer.flush()
Async Write Pattern
For high-throughput pipelines, buffer writes and gather futures:
import concurrent.futures
futures = []
for i in range(100):
future = writer.async_write_with_ack(f"message-{i}")
futures.append(future)
concurrent.futures.wait(futures)
for f in futures:
if f.exception():
raise f.exception()
Writer Backpressure
By default the writer’s internal buffer is unbounded — write() always returns immediately
regardless of how many unacknowledged messages are in flight. Enable backpressure by setting
one or both limits:
writer = driver.topic_client.writer(
"/local/my-topic",
max_buffer_size_bytes=50 * 1024 * 1024, # pause when 50 MB in flight
max_buffer_messages=1000, # pause when 1000 messages in flight
)
A message is counted as occupying the buffer from the moment it is passed to write()
until the server acknowledges it. Backpressure is active when at least one limit is set;
setting both means either limit can trigger a wait (OR semantics).
The limits are soft: write() blocks only if the buffer is already at or above the
limit when the call starts. Once unblocked, the entire batch is admitted regardless of its
size. This means callers that batch multiple messages in a single write() call will never
deadlock even when the batch is larger than the limit.
Blocking behavior (default)
When the buffer is at or above the limit, write() blocks until enough messages are
acknowledged by the server. There is no timeout by default — the call waits indefinitely:
# Producer pauses here if the buffer is full, then proceeds once space is freed.
writer.write("message")
Timeout
Set buffer_wait_timeout_sec to raise TopicWriterBufferFullError if space
does not free up in time. Use a positive value to wait up to that many seconds, or 0 to
fail immediately without waiting (non-blocking):
writer = driver.topic_client.writer(
"/local/my-topic",
max_buffer_messages=500,
buffer_wait_timeout_sec=5.0, # raise after 5 seconds; use 0 to fail immediately
)
try:
writer.write("message")
except ydb.TopicWriterBufferFullError:
# handle overload — log, drop, or apply back-off
...
Async client
The async writer behaves identically — await writer.write() suspends the coroutine
instead of blocking the thread:
writer = driver.topic_client.writer(
"/local/my-topic",
max_buffer_size_bytes=4 * 1024 * 1024,
buffer_wait_timeout_sec=10.0,
)
try:
await writer.write("message")
except ydb.TopicWriterBufferFullError:
...
To apply your own timeout without raising an error, wrap the call with
asyncio.wait_for:
try:
await asyncio.wait_for(writer.write("message"), timeout=2.0)
except asyncio.TimeoutError:
... # timed out waiting for buffer space
Reading Messages
Create a Reader
A reader requires a consumer name that must already exist on the topic.
Synchronous:
with driver.topic_client.reader("/local/my-topic", consumer="my-consumer") as reader:
message = reader.receive_message()
print(message.data.decode())
reader.commit(message)
Asynchronous:
async with driver.topic_client.reader("/local/my-topic", consumer="my-consumer") as reader:
message = await reader.receive_message()
print(message.data.decode())
reader.commit(message)
Reader Parameters
reader = driver.topic_client.reader(
topic="/local/my-topic", # str, TopicReaderSelector, or a list of these
consumer="my-consumer",
buffer_size_bytes=50 * 1024 * 1024, # client-side buffer (default: 50 MB)
buffer_release_threshold=0.5, # see below (default: 0.5)
)
buffer_size_bytes controls how many bytes the server is allowed to send before the client
signals that it is ready for more. The server will not exceed this limit.
buffer_release_threshold (float in [0.0, 1.0]) controls when the client sends a new
ReadRequest to the server after consuming messages from the local buffer:
0.0— send aReadRequestimmediately after every batch is consumed. Produces more round-trips when many small batches arrive.> 0.0— accumulate freed bytes until they reachthreshold × buffer_size_bytes, then send a singleReadRequestcovering the accumulated amount. This reduces network round-trips. The default is0.5.
Example — reduce round-trips for a high-throughput reader with many small messages:
reader = driver.topic_client.reader(
"/local/my-topic",
consumer="my-consumer",
buffer_size_bytes=50 * 1024 * 1024,
buffer_release_threshold=0.2, # send ReadRequest after freeing 10 MiB
)
To read from multiple topics at once, pass a list:
reader = driver.topic_client.reader(
topic=["/local/topic-a", "/local/topic-b"],
consumer="my-consumer",
)
To fine-tune per-topic settings (e.g. start offset or timestamp), use TopicReaderSelector:
reader = driver.topic_client.reader(
topic=ydb.TopicReaderSelector(
path="/local/my-topic",
read_from=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc),
),
consumer="my-consumer",
)
Receiving Messages
One message at a time:
# Synchronous — blocks until a message arrives.
message = reader.receive_message()
print(message.data.decode())
reader.commit(message)
# With timeout — raises TimeoutError if no message arrives within 1 second.
try:
message = reader.receive_message(timeout=1)
except TimeoutError:
print("no new messages")
# Asynchronous:
message = await reader.receive_message()
Batch processing:
# Synchronous
batch = reader.receive_batch()
for message in batch.messages:
process(message)
reader.commit(batch) # commit the whole batch at once
# With size limits
batch = reader.receive_batch(max_messages=100)
# Asynchronous
batch = await reader.receive_batch()
for message in batch.messages:
process(message)
reader.commit(batch)
Message Fields
Each received message exposes:
message.data # bytes — the message payload
message.seqno # int — producer sequence number
message.offset # int — partition offset
message.partition_id # int — partition this message came from
message.producer_id # str — producer_id set by the writer
message.created_at # datetime — timestamp set by the writer
message.written_at # datetime — timestamp when YDB persisted the message
message.metadata_items # Dict[str, bytes] — arbitrary key-value metadata
Committing Offsets
Committing tells YDB that the consumer has successfully processed a message. YDB resumes from the last committed offset when the reader reconnects.
# Non-blocking commit — buffered and sent in the background.
reader.commit(message)
reader.commit(batch)
# Blocking commit — waits for the server to acknowledge the commit.
reader.commit_with_ack(message)
# Async version:
await reader.commit_with_ack(message)
Note
commit() is non-blocking and sufficient for most use cases. The offset is flushed
automatically when the reader is closed (flush=True by default).
Handling Partition Rebalancing
When the topic rebalances (partitions are reassigned between reader instances), unfinished
messages from revoked partitions become stale. The SDK signals this via the alive
property:
# Per message
message = reader.receive_message()
# ... do some work ...
if message.alive:
reader.commit(message)
# Per batch — check before expensive processing
batch = reader.receive_batch()
for message in batch.messages:
if not batch.alive:
break # partition revoked, skip remaining messages
process(message)
if batch.alive:
reader.commit(batch)
Transactions
The SDK integrates with YDB transactions so that topic writes and reads can be made atomically together with database queries.
Transactional Write
Use topic_client.tx_writer(tx, topic) inside a transaction callback:
Synchronous:
def write_with_tx(driver: ydb.Driver, topic: str):
with ydb.QuerySessionPool(driver) as pool:
def callee(tx: ydb.QueryTxContext):
tx_writer = driver.topic_client.tx_writer(tx, topic)
for i in range(10):
tx_writer.write(ydb.TopicWriterMessage(f"message-{i}"))
pool.retry_tx_sync(callee)
Asynchronous:
async def write_with_tx(driver: ydb.aio.Driver, topic: str):
async with ydb.aio.QuerySessionPool(driver) as pool:
async def callee(tx: ydb.aio.QueryTxContext):
tx_writer = driver.topic_client.tx_writer(tx, topic)
for i in range(10):
await tx_writer.write(ydb.TopicWriterMessage(f"message-{i}"))
await pool.retry_tx_async(callee)
Messages written via a tx_writer are visible to readers only after the transaction commits.
If the transaction rolls back, the messages are discarded.
Transactional Read
Use reader.receive_batch_with_tx(tx) to read messages inside a transaction. The commit
offset is advanced atomically with the transaction:
Synchronous:
def read_with_tx(driver: ydb.Driver, topic: str, consumer: str):
with driver.topic_client.reader(topic, consumer) as reader:
with ydb.QuerySessionPool(driver) as pool:
def callee(tx: ydb.QueryTxContext):
batch = reader.receive_batch_with_tx(tx, max_messages=10)
for message in batch.messages:
process(message)
pool.retry_tx_sync(callee)
Asynchronous:
async def read_with_tx(driver: ydb.aio.Driver, topic: str, consumer: str):
async with driver.topic_client.reader(topic, consumer) as reader:
async with ydb.aio.QuerySessionPool(driver) as pool:
async def callee(tx: ydb.aio.QueryTxContext):
batch = await reader.receive_batch_with_tx(tx, max_messages=10)
for message in batch.messages:
process(message)
await pool.retry_tx_async(callee)
Note
Do not call reader.commit() when using receive_batch_with_tx — the commit is
handled automatically by the transaction.
Auto-Partitioning
YDB can automatically scale the number of partitions up (and optionally down) based on write throughput. Enable it when creating a topic:
import datetime
import ydb
driver.topic_client.create_topic(
"/local/my-topic",
consumers=["my-consumer"],
max_active_partitions=100,
auto_partitioning_settings=ydb.TopicAutoPartitioningSettings(
strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP,
up_utilization_percent=70, # split a partition when it reaches 70% capacity
down_utilization_percent=30, # merge partitions when below 30% (SCALE_UP_AND_DOWN only)
stabilization_window=datetime.timedelta(seconds=60),
),
)
Available TopicAutoPartitioningStrategy values:
SCALE_UP— only split partitions (add capacity).SCALE_UP_AND_DOWN— split and merge partitions automatically.DISABLED— static partition count.
The reader handles partition splits and merges transparently when auto_partitioning_support=True
(which is the default):
reader = driver.topic_client.reader(
"/local/my-topic",
consumer="my-consumer",
auto_partitioning_support=True, # default
)
Custom Codecs
The SDK supports RAW (no compression) and GZIP out of the box. You can register custom encode/decode functions for any codec ID:
Custom encoder (writer side):
import zlib
def my_compress(data: bytes) -> bytes:
return zlib.compress(data)
CUSTOM_CODEC = 10001 # must be in range 10000–19999
writer = driver.topic_client.writer(
"/local/my-topic",
codec=CUSTOM_CODEC,
encoders={CUSTOM_CODEC: my_compress},
)
Custom decoder (reader side):
def my_decompress(data: bytes) -> bytes:
return zlib.decompress(data)
reader = driver.topic_client.reader(
"/local/my-topic",
consumer="my-consumer",
decoders={CUSTOM_CODEC: my_decompress},
)
For CPU-intensive codecs you can offload encoding/decoding to a thread pool:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
writer = driver.topic_client.writer(
"/local/my-topic",
codec=CUSTOM_CODEC,
encoders={CUSTOM_CODEC: my_compress},
encoder_executor=executor,
)
Topic Client Settings
Pass TopicClientSettings to the driver config to tune SDK-wide defaults:
config = ydb.DriverConfig(
endpoint="grpc://localhost:2136",
database="/local",
)
config.topic_client_settings = ydb.TopicClientSettings(
encode_decode_threads_count=4, # threads used for codec operations (default: 1)
)
driver = ydb.Driver(config)
Complete Examples
Synchronous: Write and Read
import ydb
endpoint = "grpc://localhost:2136"
database = "/local"
topic_path = "/local/my-topic"
consumer = "my-consumer"
with ydb.Driver(endpoint=endpoint, database=database) as driver:
driver.wait(timeout=5, fail_fast=True)
# Create topic (skip if already exists)
try:
driver.topic_client.drop_topic(topic_path)
except ydb.SchemeError:
pass
driver.topic_client.create_topic(topic_path, consumers=[consumer])
# Write messages
with driver.topic_client.writer(topic_path) as writer:
for i in range(10):
writer.write(f"message-{i}")
# Read messages
with driver.topic_client.reader(topic_path, consumer=consumer) as reader:
for _ in range(10):
try:
message = reader.receive_message(timeout=5)
print(message.data.decode())
reader.commit(message)
except TimeoutError:
break
Asynchronous: Write and Read Concurrently
import asyncio
import ydb
endpoint = "grpc://localhost:2136"
database = "/local"
topic_path = "/local/my-topic"
consumer = "my-consumer"
async def write_messages(driver: ydb.aio.Driver):
async with driver.topic_client.writer(topic_path) as writer:
for i in range(10):
await writer.write(ydb.TopicWriterMessage(
data=f"message-{i}",
metadata_items={"index": str(i)},
))
async def read_messages(driver: ydb.aio.Driver):
async with driver.topic_client.reader(topic_path, consumer=consumer) as reader:
while True:
try:
message = await asyncio.wait_for(reader.receive_message(), timeout=5)
print(message.data.decode(), message.metadata_items)
reader.commit(message)
except asyncio.TimeoutError:
return
async def main():
async with ydb.aio.Driver(endpoint=endpoint, database=database) as driver:
await driver.wait(timeout=5, fail_fast=True)
# Create topic
try:
await driver.topic_client.drop_topic(topic_path)
except ydb.SchemeError:
pass
await driver.topic_client.create_topic(topic_path, consumers=[consumer])
await asyncio.gather(write_messages(driver), read_messages(driver))
asyncio.run(main())