Skip to content

Broker Router#

Sometimes you want to:

  • split an application into includable modules
  • separate business logic from your handler registration
  • apply some decoder/middleware/dependencies to a subscribers group

For these reasons, FastStream has a special Broker Router.

Router Usage#

First, you need to import the Broker Router from the same module from where you imported the broker.

When creating a Broker Router, you can specify a prefix that will be automatically applied to all subscribers and publishers of this router.

from faststream import FastStream
from faststream.kafka import KafkaBroker, KafkaRouter

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
router = KafkaRouter(prefix="prefix_")
from faststream import FastStream
from faststream.confluent import KafkaBroker, KafkaRouter

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
router = KafkaRouter(prefix="prefix_")
from faststream import FastStream
from faststream.rabbit import RabbitBroker, RabbitRouter

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
router = RabbitRouter(prefix="prefix_")
from faststream import FastStream
from faststream.nats import NatsBroker, NatsRouter

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)
router = NatsRouter(prefix="prefix_")
from faststream import FastStream
from faststream.redis import RedisBroker, RedisRouter

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)
router = RedisRouter(prefix="prefix_")

Now you can use the created router to register handlers and publishers as if it were a regular broker

@router.subscriber("test-topic")
@router.publisher("another-topic")
async def handle(name: str, user_id: int) -> str:
    assert name == "John"
    assert user_id == 1
    return "Hi!"


@router.subscriber("another-topic")
async def handle_response(msg: str):
    assert msg == "Hi!"
@router.subscriber("test-topic")
@router.publisher("another-topic")
async def handle(name: str, user_id: int) -> str:
    assert name == "John"
    assert user_id == 1
    return "Hi!"


@router.subscriber("another-topic")
async def handle_response(msg: str):
    assert msg == "Hi!"
@router.subscriber("test-queue")
@router.publisher("another-queue")
async def handle(name: str, user_id: int) -> str:
    assert name == "John"
    assert user_id == 1
    return "Hi!"


@router.subscriber("another-queue")
async def handle_response(msg: str):
    assert msg == "Hi!"
@router.subscriber("test-subject")
@router.publisher("another-subject")
async def handle(name: str, user_id: int) -> str:
    assert name == "John"
    assert user_id == 1
    return "Hi!"


@router.subscriber("another-subject")
async def handle_response(msg: str):
    assert msg == "Hi!"
@router.subscriber("test-channel")
@router.publisher("another-channel")
async def handle(name: str, user_id: int) -> str:
    assert name == "John"
    assert user_id == 1
    return "Hi!"


@router.subscriber("another-channel")
async def handle_response(msg: str):
    assert msg == "Hi!"

Then you can simply include all the handlers declared using the router in your broker

broker.include_router(router)

Please note that when publishing a message, you now need to specify the same prefix that you used when creating the router

await broker.publish(
    {"name": "John", "user_id": 1},
    topic="prefix_test-topic",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    topic="prefix_test-topic",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    queue="prefix_test-queue",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    subject="prefix_test-subject",
)
await broker.publish(
    {"name": "John", "user_id": 1},
    channel="prefix_test-channel",
)

Tip

Also, when creating a Broker Router, you can specify middleware, dependencies, parser and decoder to apply them to all subscribers declared via this router.

Delay Handler Registration#

If you want to separate your application's core logic from FastStream's routing logic, you can write some core functions and use them as Broker Router handlers later:

from faststream.kafka import KafkaRoute, KafkaRouter, KafkaPublisher

async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
    return "Hi!"

router = KafkaRouter(
    handlers=(
        KafkaRoute(
            handle,
            "test-topic",
            publishers=(
                KafkaPublisher("outer-topic"),
            ),
        ),
    )
)

Above example is identical to the following one:

1
2
3
4
5
6
@router.subscriber("test-topic")
@router.publisher("outer-topic")
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
    return "Hi!"
from faststream.confluent import KafkaRoute, KafkaRouter, KafkaPublisher

async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
    return "Hi!"

router = KafkaRouter(
    handlers=(
        KafkaRoute(
            handle,
            "test-topic",
            publishers=(
                KafkaPublisher("outer-topic"),
            ),
        ),
    )
)

Above example is identical to the following one:

1
2
3
4
5
6
@router.subscriber("test-topic")
@router.publisher("outer-topic")
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
    return "Hi!"
from faststream.rabbit import RabbitRoute, RabbitRouter, RabbitPublisher

async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
    return "Hi!"

router = RabbitRouter(
    handlers=(
        RabbitRoute(
            handle,
            "test-queue",
            publishers=(
                RabbitPublisher("outer-queue"),
            )
        ),
    )
)

Above example is identical to the following one:

1
2
3
4
5
6
@router.subscriber("test-queue")
@router.publisher("outer-queue")
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
    return "Hi!"
from faststream.nats import NatsRoute, NatsRouter, NatsPublisher

async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
    return "Hi!"

router = NatsRouter(
    handlers=(
        NatsRoute(
            handle,
            "test-subject",
            publishers=(
                NatsPublisher("outer-subject"),
            ),
        ),
    )
)

Above example is identical to the following one:

1
2
3
4
5
6
@router.subscriber("test-subject")
@router.publisher("outer-subject")
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
    return "Hi!"
from faststream.redis import RedisRouter, RedisRoute, RedisPublisher

async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
    return "Hi!"

router = RedisRouter(
    handlers=(
        RedisRoute(
            handle,
            "test-channel",
            publishers=(
                RedisPublisher("outer-channel"),
            )
        ),
    )
)

Above example is identical to the following one:

1
2
3
4
5
@router.subscriber("test-channel")
@router.publisher("outer-channel")
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

Warning

Be careful, this way you won't be able to test your handlers with a mock object.