Skip to content

Redis Stream Basic Subscriber#

To start consuming from a Redis stream, simply decorate your consuming function with the @broker.subscriber(...) decorator, passing a string as the stream key.

In the following example, we will create a simple FastStream app that will consume messages from a "test-stream" Redis stream.

The full app code looks like this:

from faststream import FastStream, Logger
from faststream.redis import RedisBroker

broker = RedisBroker()
app = FastStream(broker)


@broker.subscriber(stream="test-stream")
async def handle(msg: str, logger: Logger):
    logger.info(msg)

Import FastStream and RedisBroker#

To use the @broker.subscriber(...) decorator, first, we need to import the base FastStream app and RedisBroker to create our broker.

from faststream import FastStream, Logger
from faststream.redis import RedisBroker

Create a RedisBroker#

Next, we will create a RedisBroker object and wrap it into the FastStream object so that we can start our app using CLI later.

broker = RedisBroker()
app = FastStream(broker)

Create a Function that will Consume Messages from a Redis stream#

Let’s create a consumer function that will consume messages from "test-stream" Redis stream and log them.

1
2
3
@broker.subscriber(stream="test-stream")
async def handle(msg: str, logger: Logger):
    logger.info(msg)

The function decorated with the @broker.subscriber(...) decorator will be called when a message is produced to the Redis stream.

The message will then be injected into the typed msg argument of the function, and its type will be used to parse the message.

In this example case, when the message is sent to a "test-stream" stream, it will be received by the handle function, and the logger will log the message content.