Skip to content

Redis Pipeline#

FastStream supports Redis pipelining to optimize performance when publishing multiple messages in a batch. This allows you to queue several Redis operations and execute them in one network round-trip, reducing latency significantly.

Usage Example#

from faststream import FastStream, Logger
from faststream.redis import RedisBroker, Pipeline

broker = RedisBroker()
app = FastStream(broker)

@broker.subscriber("test")
async def handle(
    msg: str,
    logger: Logger,
    pipe: Pipeline,
) -> None:
    logger.info(msg)

    for i in range(10):
        await broker.publish(
            f"hello {i}",
            channel="test-output",  # queue can be channel, list, or stream
            pipeline=pipe,
        )

    results = await pipe.execute()  # execute all publish commands
    logger.info(results)

@app.after_startup
async def t() -> None:
    await broker.publish("Hi!", "test")

API#

You can pass the pipeline parameter to the publish method to delay the execution of Redis commands. The commands will only be executed after you explicitly call await pipe.execute().

The pipeline object is injected by the Pipeline annotation:

from faststream.redis.annotations import Pipeline

Pipeline is a Redis pipeline object (redis.asyncio.client.Pipeline), which is wrapped in a FastStream dependency and will be automatically available in any subscriber.

Batch Publishing with Pipeline#

When using broker.publish_batch() in combination with the pipeline parameter, all messages sent through the pipeline are queued and processed by the subscriber as a single batch after calling await pipe.execute(). This allows the subscriber to handle all messages sent through the pipeline in a single execution, improving the efficiency of batch processing.

Notes#

  • Pipelining is supported for all Redis queue types, including channels, lists, and streams.
  • You can combine multiple queue types in a single pipeline.

Benefits#

  • Reduces network traffic by batching Redis commands.
  • Improves performance in high-volume scenarios.
  • Fully integrates with FastStream's dependency injection system.
  • Allows for efficient batch processing when using broker.publish_batch() and pipeline, as all messages are processed as a single entity by the subscriber after await pipe.execute().