Skip to content

RabbitMQ Queue/Exchange Declaration#

Declaring queues and exchanges#

FastStream subscribers declares and validates all using RabbitMQ exchanges and queues (publishers declares exchanges only), but sometimes you need to declare them manually.

RabbitBroker provides a way to achieve this easily.

from faststream import FastStream
from faststream.rabbit import (
    ExchangeType,
    RabbitBroker,
    RabbitExchange,
    RabbitQueue,
)

broker = RabbitBroker()
app = FastStream(broker)


@app.after_startup
async def declare_smth():
    await broker.declare_exchange(
        RabbitExchange(
            name="some-exchange",
            type=ExchangeType.FANOUT,
        )
    )

    await broker.declare_queue(
        RabbitQueue(
            name="some-queue",
            durable=True,
        )
    )

These methods require just one argument (RabbitQueue/RabbitExchange) containing information about your RabbitMQ required objects. They declare/validate RabbitMQ objects and return low-level aio-pika robust objects to interact with.

Tip

Also, these methods are idempotent, so you can call them with the same arguments multiple times, but the objects will be created once; next time the method will return an already stored object. This way you can get access to any queue/exchange created automatically.

Binding a queue to an exchange#

It is also possible to bind a queue and an exchange using low-level aio-pika RobustQueue.bind method:

import aio_pika
from faststream import FastStream
from faststream.rabbit import (
    ExchangeType,
    RabbitBroker,
    RabbitExchange,
    RabbitQueue,
)

broker = RabbitBroker()
app = FastStream(broker)


some_queue = RabbitQueue(
    name="some-queue",
    durable=True,
)

some_exchange = RabbitExchange(
    name="some-exchange",
    type=ExchangeType.FANOUT,
)

@app.after_startup
async def bind_queue_exchange():
    queue: aio_pika.RobustQueue = await broker.declare_queue(
        some_queue
    )

    exchange: aio_pika.RobustExchange = await broker.declare_exchange(
        some_exchange
    )

    await queue.bind(
        exchange=exchange,
        routing_key=queue.name  # Optional parameter
    )