Skip to content

RabbitDeclarer

faststream.rabbit.helpers.RabbitDeclarer #

RabbitDeclarer(channel: RobustChannel)

Bases: Singleton

A class to declare RabbitMQ queues and exchanges.

METHOD DESCRIPTION
declare_queue

RabbitQueue) -> aio_pika.RobustQueue Declares a queue and returns the declared queue object.

declare_exchange

RabbitExchange) -> aio_pika.RobustExchange Declares an exchange and returns the declared exchange object.

Initialize the class.

PARAMETER DESCRIPTION
channel

Aio_pika RobustChannel object

TYPE: RobustChannel

Source code in faststream/rabbit/helpers.py
def __init__(self, channel: aio_pika.RobustChannel) -> None:
    """Initialize the class.

    Args:
        channel: Aio_pika RobustChannel object

    Attributes:
        channel: Aio_pika RobustChannel object
        queues: A dictionary to store queues
        exchanges: A dictionary to store exchanges

    """
    self.channel = channel
    self.queues = {}
    self.exchanges = {}

channel instance-attribute #

channel: RobustChannel = channel

exchanges instance-attribute #

exchanges: Dict[
    Union[RabbitExchange, str], RobustExchange
] = {}

queues instance-attribute #

queues: Dict[Union[RabbitQueue, str], RobustQueue] = {}

declare_exchange async #

declare_exchange(
    exchange: RabbitExchange,
) -> RobustExchange

Declare an exchange.

PARAMETER DESCRIPTION
exchange

RabbitExchange object representing the exchange to be declared.

TYPE: RabbitExchange

RETURNS DESCRIPTION
RobustExchange

aio_pika.RobustExchange: The declared exchange.

RAISES DESCRIPTION
NotImplementedError

If silent animals are not supported.

Source code in faststream/rabbit/helpers.py
async def declare_exchange(
    self,
    exchange: RabbitExchange,
) -> aio_pika.RobustExchange:
    """Declare an exchange.

    Args:
        exchange: RabbitExchange object representing the exchange to be declared.

    Returns:
        aio_pika.RobustExchange: The declared exchange.

    Raises:
        NotImplementedError: If silent animals are not supported.

    """
    exch = self.exchanges.get(exchange)

    if exch is None:
        exch = cast(
            aio_pika.RobustExchange,
            await self.channel.declare_exchange(
                **model_to_dict(
                    exchange,
                    exclude={
                        "routing_key",
                        "bind_arguments",
                        "bind_to",
                    },
                )
            ),
        )
        self.exchanges[exchange] = exch

    if exchange.bind_to is not None:
        parent = await self.declare_exchange(exchange.bind_to)
        await exch.bind(
            exchange=parent,
            routing_key=exchange.routing_key,
            arguments=exchange.arguments,
        )

    return exch

declare_queue async #

declare_queue(queue: RabbitQueue) -> RobustQueue

Declare a queue.

PARAMETER DESCRIPTION
queue

RabbitQueue object representing the queue to be declared.

TYPE: RabbitQueue

RETURNS DESCRIPTION
RobustQueue

aio_pika.RobustQueue: The declared queue.

Source code in faststream/rabbit/helpers.py
async def declare_queue(
    self,
    queue: RabbitQueue,
) -> aio_pika.RobustQueue:
    """Declare a queue.

    Args:
        queue: RabbitQueue object representing the queue to be declared.

    Returns:
        aio_pika.RobustQueue: The declared queue.

    """
    q = self.queues.get(queue)
    if q is None:
        q = cast(
            aio_pika.RobustQueue,
            await self.channel.declare_queue(
                **model_to_dict(
                    queue,
                    exclude={
                        "routing_key",
                        "path_regex",
                        "bind_arguments",
                    },
                )
            ),
        )
        self.queues[queue] = q
    return q