Redis Stream Basic Subscriber#
To start consuming from a Redis stream, simply decorate your consuming function with the @broker.subscriber(...)
decorator, passing a string as the stream key.
In the following example, we will create a simple FastStream app that will consume messages from a "test-stream"
Redis stream.
The full app code looks like this:
Import FastStream and RedisBroker#
To use the @broker.subscriber(...)
decorator, first, we need to import the base FastStream app and RedisBroker to create our broker.
Create a RedisBroker#
Next, we will create a RedisBroker
object and wrap it into the FastStream
object so that we can start our app using CLI later.
Create a Function that will Consume Messages from a Redis stream#
Let’s create a consumer function that will consume messages from "test-stream"
Redis stream and log them.
The function decorated with the @broker.subscriber(...)
decorator will be called when a message is produced to the Redis stream.
The message will then be injected into the typed msg
argument of the function, and its type will be used to parse the message.
In this example case, when the message is sent to a "test-stream"
stream, it will be received by the handle
function, and the logger
will log the message content.