Bases: Singleton
A class to declare RabbitMQ queues and exchanges.
METHOD | DESCRIPTION |
declare_queue | RabbitQueue) -> aio_pika.RobustQueue Declares a queue and returns the declared queue object. |
declare_exchange | RabbitExchange) -> aio_pika.RobustExchange Declares an exchange and returns the declared exchange object. |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Initialize the class.
PARAMETER | DESCRIPTION |
channel | Aio_pika RobustChannel object TYPE: RobustChannel |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/rabbit/helpers.py
| def __init__(self, channel: aio_pika.RobustChannel) -> None:
"""Initialize the class.
Args:
channel: Aio_pika RobustChannel object
Attributes:
channel: Aio_pika RobustChannel object
queues: A dictionary to store queues
exchanges: A dictionary to store exchanges
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self.channel = channel
self.queues = {}
self.exchanges = {}
|
channel instance-attribute
channel: aio_pika.RobustChannel = channel
exchanges instance-attribute
exchanges: Dict[
Union[RabbitExchange, str], aio_pika.RobustExchange
] = {}
queues instance-attribute
queues: Dict[
Union[RabbitQueue, str], aio_pika.RobustQueue
] = {}
declare_exchange async
declare_exchange(
exchange: RabbitExchange,
) -> aio_pika.RobustExchange
Declare an exchange.
PARAMETER | DESCRIPTION |
exchange | RabbitExchange object representing the exchange to be declared. TYPE: RabbitExchange |
RETURNS | DESCRIPTION |
RobustExchange | aio_pika.RobustExchange: The declared exchange. |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/rabbit/helpers.py
| async def declare_exchange(
self,
exchange: RabbitExchange,
) -> aio_pika.RobustExchange:
"""Declare an exchange.
Args:
exchange: RabbitExchange object representing the exchange to be declared.
Returns:
aio_pika.RobustExchange: The declared exchange.
Raises:
NotImplementedError: If silent animals are not supported.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
exch = self.exchanges.get(exchange)
if exch is None:
exch = cast(
aio_pika.RobustExchange,
await self.channel.declare_exchange(
**model_to_dict(
exchange,
exclude={
"routing_key",
"bind_arguments",
"bind_to",
},
)
),
)
self.exchanges[exchange] = exch
if exchange.bind_to is not None:
parent = await self.declare_exchange(exchange.bind_to)
await exch.bind(
exchange=parent,
routing_key=exchange.routing_key,
arguments=exchange.arguments,
)
return exch
|
declare_queue async
declare_queue(queue: RabbitQueue) -> aio_pika.RobustQueue
Declare a queue.
PARAMETER | DESCRIPTION |
queue | RabbitQueue object representing the queue to be declared. TYPE: RabbitQueue |
RETURNS | DESCRIPTION |
RobustQueue | aio_pika.RobustQueue: The declared queue. |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/rabbit/helpers.py
| async def declare_queue(
self,
queue: RabbitQueue,
) -> aio_pika.RobustQueue:
"""Declare a queue.
Args:
queue: RabbitQueue object representing the queue to be declared.
Returns:
aio_pika.RobustQueue: The declared queue.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
q = self.queues.get(queue)
if q is None:
q = cast(
aio_pika.RobustQueue,
await self.channel.declare_queue(
**model_to_dict(
queue,
exclude={
"routing_key",
"path_regex",
"bind_arguments",
},
)
),
)
self.queues[queue] = q
return q
|