Skip to content

Broker Router#

Sometimes you want to:

  • split an application into includable modules
  • separate business logic from your handler registration
  • apply some decoder/middleware/dependencies to a subscribers group

For these reasons, FastStream has a special Broker Router.

Router Usage#

First, you need to import the Broker Router from the same module from where you imported the broker.

When creating a Broker Router, you can specify a prefix that will be automatically applied to all subscribers and publishers of this router.

from faststream import FastStream
from faststream.kafka import KafkaBroker, KafkaRouter

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
router = KafkaRouter(prefix="prefix_")
from faststream import FastStream
from faststream.rabbit import RabbitBroker, RabbitRouter

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
router = RabbitRouter(prefix="prefix_")
from faststream import FastStream
from faststream.nats import NatsBroker, NatsRouter

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)
router = NatsRouter(prefix="prefix_")

Now you can use the created router to register handlers and publishers as if it were a regular broker

@router.subscriber("test-topic")
@router.publisher("another-topic")
async def handle(name: str, user_id: int) -> str:
    assert name == "John"
    assert user_id == 1
    return "Hi!"


@router.subscriber("another-topic")
async def handle_response(msg: str):
    assert msg == "Hi!"
@router.subscriber("test-queue")
@router.publisher("another-queue")
async def handle(name: str, user_id: int) -> str:
    assert name == "John"
    assert user_id == 1
    return "Hi!"


@router.subscriber("another-queue")
async def handle_response(msg: str):
    assert msg == "Hi!"
@router.subscriber("test-subject")
@router.publisher("another-subject")
async def handle(name: str, user_id: int) -> str:
    assert name == "John"
    assert user_id == 1
    return "Hi!"


@router.subscriber("another-subject")
async def handle_response(msg: str):
    assert msg == "Hi!"

Then you can simply include all the handlers declared using the router in your broker

broker.include_router(router)
broker.include_router(router)
broker.include_router(router)

Please note that when publishing a message, you now need to specify the same prefix that you used when creating the router

    await broker.publish(
        {"name": "John", "user_id": 1},
        topic="prefix_test-topic",
    )
    await broker.publish(
        {"name": "John", "user_id": 1},
        queue="prefix_test-queue",
    )
    await broker.publish(
        {"name": "John", "user_id": 1},
        subject="prefix_test-subject",
    )

Tip

Also, when creating a Broker Router, you can specify middleware, dependencies, parser and decoder to apply them to all subscribers declared via this router.

Delay Handler Registration#

If you want to separate your application's core logic from FastStream's routing logic, you can write some core functions and use them as Broker Router handlers later:

from faststream import FastStream
from faststream.kafka import KafkaBroker, KafkaRoute, KafkaRouter

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1


router = KafkaRouter(handlers=(KafkaRoute(handle, "test-topic"),))

broker.include_router(router)
from faststream import FastStream
from faststream.rabbit import RabbitBroker, RabbitRoute, RabbitRouter

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1


router = RabbitRouter(handlers=(RabbitRoute(handle, "test-queue"),))

broker.include_router(router)
from faststream import FastStream
from faststream.nats import NatsBroker, NatsRoute, NatsRouter

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1


router = NatsRouter(handlers=(NatsRoute(handle, "test-subject"),))

broker.include_router(router)

Warning

Be careful, this way you won't be able to test your handlers with a mock object.


Last update: 2023-09-21