Broker Router Sometimes you want to:
split an application into includable modules separate business logic from your handler registration apply some decoder /middleware /dependencies to a subscribers group For these reasons, FastStream has a special Broker Router .
Router Usage First, you need to import the Broker Router from the same module from where you imported the broker.
When creating a Broker Router , you can specify a prefix that will be automatically applied to all subscribers and publishers of this router.
Now you can use the created router to register handlers and publishers as if it were a regular broker
Then you can simply include all the handlers declared using the router in your broker
broker . include_router ( router )
Please note that when publishing a message, you now need to specify the same prefix that you used when creating the router
AIOKafka Confluent RabbitMQ NATS Redis
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
topic = "prefix_test-topic" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
topic = "prefix_test-topic" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
queue = "prefix_test-queue" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
subject = "prefix_test-subject" ,
)
await broker . publish (
{ "name" : "John" , "user_id" : 1 },
channel = "prefix_test-channel" ,
)
Delay Handler Registration If you want to separate your application's core logic from FastStream 's routing logic, you can write some core functions and use them as Broker Router handlers
later:
AIOKafka Confluent RabbitMQ NATS Redis
from faststream.kafka import KafkaRoute , KafkaRouter , KafkaPublisher
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
return "Hi!"
router = KafkaRouter (
handlers = (
KafkaRoute (
handle ,
"test-topic" ,
publishers = (
KafkaPublisher ( "outer-topic" ),
),
),
)
)
Above example is identical to the following one:
@router . subscriber ( "test-topic" )
@router . publisher ( "outer-topic" )
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
return "Hi!"
from faststream.confluent import KafkaRoute , KafkaRouter , KafkaPublisher
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
return "Hi!"
router = KafkaRouter (
handlers = (
KafkaRoute (
handle ,
"test-topic" ,
publishers = (
KafkaPublisher ( "outer-topic" ),
),
),
)
)
Above example is identical to the following one:
@router . subscriber ( "test-topic" )
@router . publisher ( "outer-topic" )
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
return "Hi!"
from faststream.rabbit import RabbitRoute , RabbitRouter , RabbitPublisher
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
return "Hi!"
router = RabbitRouter (
handlers = (
RabbitRoute (
handle ,
"test-queue" ,
publishers = (
RabbitPublisher ( "outer-queue" ),
)
),
)
)
Above example is identical to the following one:
@router . subscriber ( "test-queue" )
@router . publisher ( "outer-queue" )
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
return "Hi!"
from faststream.nats import NatsRoute , NatsRouter , NatsPublisher
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
return "Hi!"
router = NatsRouter (
handlers = (
NatsRoute (
handle ,
"test-subject" ,
publishers = (
NatsPublisher ( "outer-subject" ),
),
),
)
)
Above example is identical to the following one:
@router . subscriber ( "test-subject" )
@router . publisher ( "outer-subject" )
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
return "Hi!"
from faststream.redis import RedisRouter , RedisRoute , RedisPublisher
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
return "Hi!"
router = RedisRouter (
handlers = (
RedisRoute (
handle ,
"test-channel" ,
publishers = (
RedisPublisher ( "outer-channel" ),
)
),
)
)
Above example is identical to the following one:
@router . subscriber ( "test-channel" )
@router . publisher ( "outer-channel" )
async def handle ( name : str , user_id : int ):
assert name == "John"
assert user_id == 1
Warning
Be careful, this way you won't be able to test your handlers with a mock
object.