Redis Stream Consumer Groups#
Consuming messages from a Redis stream can be accomplished by using a Consumer Group. This allows multiple consumers to divide the workload of processing messages in a stream and provides a form of message acknowledgment, ensuring that messages are not processed repeatedly.
Consumer Groups in Redis enable a group of clients to cooperatively consume different portions of the same stream of messages. When using group="..."
(which internally uses XREADGROUP
), messages are distributed among different consumers in a group and are not delivered to any other consumer in that group again, unless they are not acknowledged (i.e., the client fails to process and does not call msg.ack()
or XACK
). This is in contrast to a normal consumer (also known as XREAD
), where every consumer sees all the messages. XREAD
is useful for broadcasting to multiple consumers, while XREADGROUP
is better suited for workload distribution.
In the following example, we will create a simple FastStream app that utilizes a Redis stream with a Consumer Group. It will consume messages sent to the "test-stream"
as part of the "test-group"
consumer group.
The full app code is as follows:
Import FastStream and RedisBroker#
First, import the FastStream
class and the RedisBroker
from the faststream.redis
module to define our broker.
Create a RedisBroker#
To establish a connection to Redis, instantiate a RedisBroker
object and pass it to the FastStream
app.
Define a Consumer Group Subscription#
Define a subscription to a Redis stream with a specific Consumer Group using the StreamSub
object and the @broker.subscriber(...)
decorator. Then, define a function that will be triggered when new messages are sent to the "test-stream"
Redis stream. This function is decorated with @broker.subscriber(...)
and will process the messages as part of the "test-group"
consumer group.
Publishing a message#
Publishing a message is the same as what's defined on Stream Publishing.
By following the steps and code examples provided above, you can create a FastStream application that consumes messages from a Redis stream using a Consumer Group for distributed message processing.