Broker Router Sometimes you want to:
split an application into includable modules separate business logic from your handler registration apply some decoder /middleware /dependencies to a subscribers group For these reasons, FastStream has a special Broker Router .
Router Usage First, you need to import the Broker Router from the same module from where you imported the broker.
When creating a Broker Router , you can specify a prefix that will be automatically applied to all subscribers and publishers of this router.
Now you can use the created router to register handlers and publishers as if it were a regular broker
Then you can simply include all the handlers declared using the router in your broker
broker . include_router ( router )
Please note that when publishing a message, you now need to specify the same prefix that you used when creating the router
AIOKafka Confluent RabbitMQ NATS Redis
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
topic = "prefix_test-topic" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
topic = "prefix_test-topic" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
queue = "prefix_test-queue" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
subject = "prefix_test-subject" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
channel = "prefix_test-channel" ,
)
Delay Handler Registration If you want to separate your application's core logic from FastStream 's routing logic, you can write some core functions and use them as Broker Router handlers
later:
AIOKafka Confluent RabbitMQ NATS Redis
from faststream.kafka import KafkaRoute , KafkaRouter
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
router = KafkaRouter (
handlers = (
KafkaRoute ( handle , "test-topic" ),
)
)
from faststream.confluent import KafkaRoute , KafkaRouter
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
router = KafkaRouter (
handlers = (
KafkaRoute ( handle , "test-topic" ),
)
)
from faststream.rabbit import RabbitRoute , RabbitRouter
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
router = RabbitRouter (
handlers = (
RabbitRoute ( handle , "test-queue" ),
)
)
from faststream.nats import NatsRoute , NatsRouter
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
router = NatsRouter (
handlers = (
NatsRoute ( handle , "test-subject" ),
)
)
from faststream.redis import RedisRouter , RedisRoute
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
router = RedisRouter (
handlers = (
RedisRoute ( handle , "test-channel" ),
)
)
Warning
Be careful, this way you won't be able to test your handlers with a mock
object.