Redis Pipeline#
FastStream supports Redis pipelining to optimize performance when publishing multiple messages in a batch. This allows you to queue several Redis operations and execute them in one network round-trip, reducing latency significantly.
Usage Example#
API#
You can pass the pipeline
parameter to the publish
method to delay the execution of Redis commands. The commands will only be executed after you explicitly call await pipe.execute()
.
The pipeline
object is injected by the Pipeline
annotation:
Pipeline
is a Redis pipeline object (redis.asyncio.client.Pipeline
), which is wrapped in a FastStream dependency and will be automatically available in any subscriber.
Batch Publishing with Pipeline#
When using broker.publish_batch()
in combination with the pipeline
parameter, all messages sent through the pipeline are queued and processed by the subscriber as a single batch after calling await pipe.execute()
. This allows the subscriber to handle all messages sent through the pipeline in a single execution, improving the efficiency of batch processing.
Notes#
- Pipelining is supported for all Redis queue types, including channels, lists, and streams.
- You can combine multiple queue types in a single pipeline.
Benefits#
- Reduces network traffic by batching Redis commands.
- Improves performance in high-volume scenarios.
- Fully integrates with FastStream's dependency injection system.
- Allows for efficient batch processing when using
broker.publish_batch()
andpipeline
, as all messages are processed as a single entity by the subscriber afterawait pipe.execute()
.