Skip to content

Application-level Filtering#

FastStream also allows you to specify the message processing way using message headers, body type or something else. The filter feature enables you to consume various messages with different schemas within a single event stream.

Tip

Message must be consumed at ONCE (crossing filters are not allowed)

As an example, let's create a subscriber for both JSON and non-JSON messages:

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

subscriber = broker.subscriber("test-topic")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

subscriber = broker.subscriber("test-topic")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

subscriber = broker.subscriber("test-queue")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

subscriber = broker.subscriber("test-subject")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

subscriber = broker.subscriber("test-channel")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"

Note

A subscriber without a filter is a default subscriber. It consumes messages that have not been consumed yet.

For now, the following message will be delivered to the handle function

await broker.publish(
    {"name": "John", "user_id": 1},
    topic="test-topic",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    topic="test-topic",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    queue="test-queue",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    subject="test-subject",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    channel="test-channel",
)

And this one will be delivered to the default_handler

await broker.publish(
    "Hello, FastStream!",
    topic="test-topic",
)
await broker.publish(
    "Hello, FastStream!",
    topic="test-topic",
)
await broker.publish(
    "Hello, FastStream!",
    queue="test-queue",
)
await broker.publish(
    "Hello, FastStream!",
    subject="test-subject",
)
await broker.publish(
    "Hello, FastStream!",
    channel="test-channel",
)