Skip to content

Redis Stream Publishing with FastStream#

Publishing Data to Redis Stream#

To publish messages to a Redis Stream, you implement a function that processes the incoming data and applies the @broker.publisher(...) decorator along with the Redis stream name to it. The function will then publish its return value to the specified stream.

  1. Create your RedisBroker instance

    broker = RedisBroker("redis://localhost:6379")
    
  2. Initiate your FastStream application with the RedisBroker

    app = FastStream(broker)
    
  3. Define your data model

    1
    2
    3
    4
    class Data(BaseModel):
        data: NonNegativeFloat = Field(
            ..., examples=[0.5], description="Float data example"
        )
    
  4. Set up the function for data processing and publishing

    Using the @broker.publisher(...) decorator in conjunction with the @broker.subscriber(...) decorator allows seamless message processing and republishing to a different stream.

    1
    2
    3
    4
    @broker.subscriber(stream="input-stream")
    @broker.publisher(stream="output-stream")
    async def on_input_data(msg: Data) -> Data:
        return Data(data=msg.data + 1.0)
    

    By decorating a function with @broker.publisher(...), we tell FastStream to publish the function's returned data to the designated "output stream". The defined function also serves as a subscriber to the "input-stream", thereby setting up a straightforward data pipeline within Redis streams.

Here's the complete example that showcases the use of decorators for both subscribing and publishing to Redis streams:

from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream
from faststream.redis import RedisBroker


class Data(BaseModel):
    data: NonNegativeFloat = Field(
        ..., examples=[0.5], description="Float data example"
    )


broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber(stream="input-stream")
@broker.publisher(stream="output-stream")
async def on_input_data(msg: Data) -> Data:
    return Data(data=msg.data + 1.0)