Skip to content

RabbitMQ Queue/Exchange Declaration#

FastStream subscribers declares and validates all using RabbitMQ exchanges and queues (publishers declares exchanges only), but sometimes you need to declare them manually.

RabbitBroker provides a way to achieve this easily.

from faststream import FastStream
from faststream.rabbit import (
    ExchangeType,
    RabbitBroker,
    RabbitExchange,
    RabbitQueue,
)

broker = RabbitBroker()
app = FastStream(broker)


@app.after_startup
async def declare_smth():
    await broker.declare_exchange(
        RabbitExchange(
            name="some-exchange",
            type=ExchangeType.FANOUT,
        )
    )

    await broker.declare_queue(
        RabbitQueue(
            name="some-queue",
            durable=True,
        )
    )

These methods require just one argument (RabbitQueue/RabbitExchange) containing information about your RabbitMQ required objects. They declare/validate RabbitMQ objects and return low-level aio-pika robust objects to interact with.

Tip

Also, these methods are idempotent, so you can call them with the same arguments multiple times, but the objects will be created once; next time the method will return an already stored object. This way you can get access to any queue/exchange created automatically.