Application-level Filtering FastStream also allows you to specify the message processing way using message headers, body type or something else. The filter
feature enables you to consume various messages with different schemas within a single event stream.
Tip
Message must be consumed at ONCE (crossing filters are not allowed)
As an example, let's create a subscriber for both JSON
and non-JSON
messages:
AIOKafka Confluent RabbitMQ NATS Redis
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-topic" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.confluent import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-topic" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-queue" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-subject" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.redis import RedisBroker
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-channel" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
Note
A subscriber without a filter is a default subscriber. It consumes messages that have not been consumed yet.
For now, the following message will be delivered to the handle
function
AIOKafka Confluent RabbitMQ NATS Redis
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
topic = "test-topic" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
topic = "test-topic" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
queue = "test-queue" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
subject = "test-subject" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
channel = "test-channel" ,
)
And this one will be delivered to the default_handler
AIOKafka Confluent RabbitMQ NATS Redis
await broker . publish (
"Hello, FastStream!" ,
topic = "test-topic" ,
)
await broker . publish (
"Hello, FastStream!" ,
topic = "test-topic" ,
)
await broker . publish (
"Hello, FastStream!" ,
queue = "test-queue" ,
)
await broker . publish (
"Hello, FastStream!" ,
subject = "test-subject" ,
)
await broker . publish (
"Hello, FastStream!" ,
channel = "test-channel" ,
)
Let's break down how message filtering works in a subscription mechanism.
Core Filtering Logic Consider a simple example of a filter implementation:
for handler in subscriber . handlers :
if await handler . filter ( msg ):
return await handler . process ( msg )
raise HandlerNotFoundError
This code selects the first suitable handler to process the message. This means the default handler should be placed last in the list. If no logical handlers match, the message must still be processed. For this, we need a special trash handler that defines the system's default behavior for such cases.
Implementing the Default Handler The default handler should be declared as follows:
subscriber = broker . subscriber ()
@subscriber ( filter =... )
async def handler (): ...
@subscriber ()
async def default_handler (): ...
Here, @subscriber()
is equivalent to @subscriber(filter=lambda _: True)
, meaning it accepts all messages. This ensures that no message goes unprocessed, even if no specific handler is found.
Summary Handlers are checked in order, and the first matching one processes the message. The default handler must be placed last to ensure all messages are handled. @subscriber()
without parameters acts as a universal handler, accepting everything. A trash handler must properly finalize the subscription and inform the broker about unnecessary data. Properly managing subscribers allows for precise message processing control and prevents data loss.