Application-level Filtering FastStream also allows you to specify the message processing way using message headers, body type or something else. The filter
feature enables you to consume various messages with different schemas within a single event stream.
Tip
Message must be consumed at ONCE (crossing filters are not allowed)
As an example, let's create a subscriber for both JSON
and non-JSON
messages:
AIOKafka Confluent RabbitMQ NATS Redis
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-topic" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.confluent import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-topic" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-queue" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-subject" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
from faststream import FastStream
from faststream.redis import RedisBroker
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
subscriber = broker . subscriber ( "test-channel" )
@subscriber (
filter = lambda msg : msg . content_type == "application/json" ,
)
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
@subscriber
async def default_handler ( msg : str ):
assert msg == "Hello, FastStream!"
Note
A subscriber without a filter is a default subscriber. It consumes messages that have not been consumed yet.
For now, the following message will be delivered to the handle
function
AIOKafka Confluent RabbitMQ NATS Redis
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
topic = "test-topic" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
topic = "test-topic" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
queue = "test-queue" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
subject = "test-subject" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
channel = "test-channel" ,
)
And this one will be delivered to the default_handler
AIOKafka Confluent RabbitMQ NATS Redis
await broker . publish (
"Hello, FastStream!" ,
topic = "test-topic" ,
)
await broker . publish (
"Hello, FastStream!" ,
topic = "test-topic" ,
)
await broker . publish (
"Hello, FastStream!" ,
queue = "test-queue" ,
)
await broker . publish (
"Hello, FastStream!" ,
subject = "test-subject" ,
)
await broker . publish (
"Hello, FastStream!" ,
channel = "test-channel" ,
)