Skip to content

Application-level Filtering#

FastStream also allows you to specify the message processing way using message headers, body type or something else. The filter feature enables you to consume various messages with different schemas within a single event stream.

Tip

Message must be consumed at ONCE (crossing filters are not allowed)

As an example, let's create a subscriber for both JSON and non-JSON messages:

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

subscriber = broker.subscriber("test-topic")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

subscriber = broker.subscriber("test-topic")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

subscriber = broker.subscriber("test-queue")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

subscriber = broker.subscriber("test-subject")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

subscriber = broker.subscriber("test-channel")

@subscriber(
    filter=lambda msg: msg.content_type == "application/json",
)
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

@subscriber
async def default_handler(msg: str):
    assert msg == "Hello, FastStream!"

Note

A subscriber without a filter is a default subscriber. It consumes messages that have not been consumed yet.

For now, the following message will be delivered to the handle function

await broker.publish(
    {"name": "John", "user_id": 1},
    topic="test-topic",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    topic="test-topic",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    queue="test-queue",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    subject="test-subject",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    channel="test-channel",
)

And this one will be delivered to the default_handler

await broker.publish(
    "Hello, FastStream!",
    topic="test-topic",
)
await broker.publish(
    "Hello, FastStream!",
    topic="test-topic",
)
await broker.publish(
    "Hello, FastStream!",
    queue="test-queue",
)
await broker.publish(
    "Hello, FastStream!",
    subject="test-subject",
)
await broker.publish(
    "Hello, FastStream!",
    channel="test-channel",
)

Technical Information#

Let's break down how message filtering works in a subscription mechanism.

Core Filtering Logic#

Consider a simple example of a filter implementation:

for handler in subscriber.handlers:
    if await handler.filter(msg):
        return await handler.process(msg)

raise HandlerNotFoundError

This code selects the first suitable handler to process the message. This means the default handler should be placed last in the list. If no logical handlers match, the message must still be processed. For this, we need a special trash handler that defines the system's default behavior for such cases.

Implementing the Default Handler#

The default handler should be declared as follows:

subscriber = broker.subscriber()

@subscriber(filter=...)
async def handler(): ...

@subscriber()
async def default_handler(): ...

Here, @subscriber() is equivalent to @subscriber(filter=lambda _: True), meaning it accepts all messages. This ensures that no message goes unprocessed, even if no specific handler is found.

Summary#

  • Handlers are checked in order, and the first matching one processes the message.
  • The default handler must be placed last to ensure all messages are handled.
  • @subscriber() without parameters acts as a universal handler, accepting everything.
  • A trash handler must properly finalize the subscription and inform the broker about unnecessary data.

Properly managing subscribers allows for precise message processing control and prevents data loss.