Skip to content

Redis Stream Consumer Groups#

Consuming messages from a Redis stream can be accomplished by using a Consumer Group. This allows multiple consumers to divide the workload of processing messages in a stream and provides a form of message acknowledgment, ensuring that messages are not processed repeatedly.

Consumer Groups in Redis enable a group of clients to cooperatively consume different portions of the same stream of messages. When using group="..." (which internally uses XREADGROUP), messages are distributed among different consumers in a group and are not delivered to any other consumer in that group again, unless they are not acknowledged (i.e., the client fails to process and does not call msg.ack() or XACK). This is in contrast to a normal consumer (also known as XREAD), where every consumer sees all the messages. XREAD is useful for broadcasting to multiple consumers, while XREADGROUP is better suited for workload distribution.

In the following example, we will create a simple FastStream app that utilizes a Redis stream with a Consumer Group. It will consume messages sent to the "test-stream" as part of the "test-group" consumer group.

The full app code is as follows:

from faststream import FastStream, Logger
from faststream.redis import RedisBroker, StreamSub

broker = RedisBroker()
app = FastStream(broker)


@broker.subscriber(stream=StreamSub("test-stream", group="test-group", consumer="1"))
async def handle(msg: str, logger: Logger):
    logger.info(msg)


@app.after_startup
async def t():
    await broker.publish("Hi!", stream="test-stream")

Import FastStream and RedisBroker#

First, import the FastStream class and the RedisBroker from the faststream.redis module to define our broker.

from faststream import FastStream, Logger
from faststream.redis import RedisBroker, StreamSub

Create a RedisBroker#

To establish a connection to Redis, instantiate a RedisBroker object and pass it to the FastStream app.

broker = RedisBroker()
app = FastStream(broker)

Define a Consumer Group Subscription#

Define a subscription to a Redis stream with a specific Consumer Group using the StreamSub object and the @broker.subscriber(...) decorator. Then, define a function that will be triggered when new messages are sent to the "test-stream" Redis stream. This function is decorated with @broker.subscriber(...) and will process the messages as part of the "test-group" consumer group.

1
2
3
@broker.subscriber(stream=StreamSub("test-stream", group="test-group", consumer="1"))
async def handle(msg: str, logger: Logger):
    logger.info(msg)

Publishing a message#

Publishing a message is the same as what's defined on Stream Publishing.

await broker.publish("Hi!", stream="test-stream")

By following the steps and code examples provided above, you can create a FastStream application that consumes messages from a Redis stream using a Consumer Group for distributed message processing.