Skip to content

Channel Subscription#

Basic Channel Subscription#

Redis Pub/Sub is the default subscriber type in FastStream, so you can simply create a regular @broker.subscriber("channel_name") with a channel name and it creates a subscriber using Redis Pub/Sub.

In this example, we will build a FastStream application that listens to messages from the Redis channel named "test".

The complete application code is presented below:

from faststream import FastStream, Logger
from faststream.redis import RedisBroker

broker = RedisBroker()
app = FastStream(broker)


@broker.subscriber("test")
async def handle(msg: str, logger: Logger):
    logger.info(msg)

Import FastStream and RedisBroker#

To utilize the @broker.subscriber(...) decorator for Redis channel subscription, you must first import FastStream and RedisBroker.

from faststream import FastStream, Logger
from faststream.redis import RedisBroker

Create a RedisBroker Instance#

Create a RedisBroker object and pass it to the FastStream object. This setup prepares the application for launch using the FastStream CLI.

broker = RedisBroker()
app = FastStream(broker)

Define the Message Handler Function#

Construct a function that will act as the consumer of messages from the "test" channel and use the logger to output the message content.

1
2
3
@broker.subscriber("test")
async def handle(msg: str, logger: Logger):
    logger.info(msg)

When a message is published to the Redis channel "test", it will trigger the invocation of the decorated function. The message will be passed to the function's msg parameter, while the logger will be available for logging purposes.

Pattern Channel Subscription#

For subscribing to multiple Redis channels matching a pattern, use the @broker.subscriber(channel=PubSub("pattern", pattern=True)) decorator, where the channel argument receives a PubSub object with the pattern and pattern flag set to True.

Here's how to create a FastStream application that subscribes to all channels matching the "test.*" pattern:

from faststream import FastStream, Logger
from faststream.redis import PubSub, RedisBroker

broker = RedisBroker()
app = FastStream(broker)


@broker.subscriber(channel=PubSub("test.*", pattern=True))
async def handle_test(msg: str, logger: Logger):
    logger.info(msg)

Use PubSub for Pattern Matching#

Import the PubSub class from faststream.redis along with other necessary modules.

from faststream import FastStream, Logger
from faststream.redis import PubSub, RedisBroker

Specify the Pattern for Channel Subscription#

To define the pattern subscription, create a PubSub object with the desired pattern ("test.*" in this case) and indicate that it's a pattern subscription by setting pattern=True.

@broker.subscriber(channel=PubSub("test.*", pattern=True))

Create the Pattern Message Handler Function#

Decide on a function that will act as the subscriber of messages from channels matching the specified pattern. Logging the messages is handled similarly as with basic channel subscription.

1
2
3
@broker.subscriber(channel=PubSub("test.*", pattern=True))
async def handle_test(msg: str, logger: Logger):
    logger.info(msg)

With pattern channel subscription, when a message is published to a channel that matches the specified pattern ("test.*"), our handler function will be invoked. The message is delivered to the msg argument of the function, similar to how it works in basic channel subscriptions.

Pattern data access#

You can also use the Redis Pub/Sub pattern feature to encode some data directly in the channel name. With FastStream you can easily access this data using the following code:

from faststream import FastStream, Logger, Path
from faststream.redis import RedisBroker

broker = RedisBroker()
app = FastStream(broker)


@broker.subscriber("test.{data}")
async def handle_test(
    msg: str,
    logger: Logger,
    data: str = Path(),
):
    logger.info(f"Channel `{data=}`, body `{msg=}`")