RabbitQueue(name, queue_type=CLASSIC, durable=EMPTY, exclusive=False, passive=False, auto_delete=False, arguments=None, timeout=None, robust=True, bind_arguments=None, routing_key='')
Bases: NameRequired
A class to represent a RabbitMQ queue.
You can find information about all options in the official RabbitMQ documentation:
https://www.rabbitmq.com/docs/queues
Initialize the RabbitMQ queue.
:param name: RabbitMQ queue name. :param durable: Whether the object is durable. :param exclusive: The queue can be used only in the current connection and will be deleted after connection closed. :param passive: Do not create queue automatically. :param auto_delete: The queue will be deleted after connection closed. :param arguments: Queue declaration arguments. You can find information about them in the official RabbitMQ documentation: https://www.rabbitmq.com/docs/queues#optional-arguments :param timeout: Send confirmation time from RabbitMQ. :param robust: Whether to declare queue object as restorable. :param bind_arguments: Queue-exchange binding options. :param routing_key: Explicit binding routing key. Uses name if not presented.
Source code in faststream/rabbit/schemas/queue.py
| def __init__(
self,
name: str,
queue_type: QueueType = QueueType.CLASSIC,
durable: bool = EMPTY,
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: Union[
"QuorumQueueArgs",
"ClassicQueueArgs",
"StreamQueueArgs",
"AnyDict",
None,
] = None,
timeout: "TimeoutType" = None,
robust: bool = True,
bind_arguments: Optional["AnyDict"] = None,
routing_key: str = "",
) -> None:
"""Initialize the RabbitMQ queue.
:param name: RabbitMQ queue name.
:param durable: Whether the object is durable.
:param exclusive: The queue can be used only in the current connection and will be deleted after connection closed.
:param passive: Do not create queue automatically.
:param auto_delete: The queue will be deleted after connection closed.
:param arguments: Queue declaration arguments.
You can find information about them in the official RabbitMQ documentation:
https://www.rabbitmq.com/docs/queues#optional-arguments
:param timeout: Send confirmation time from RabbitMQ.
:param robust: Whether to declare queue object as restorable.
:param bind_arguments: Queue-exchange binding options.
:param routing_key: Explicit binding routing key. Uses name if not presented.
"""
re, routing_key = compile_path(
routing_key,
replace_symbol="*",
patch_regex=lambda x: x.replace(r"\#", ".+"),
)
if queue_type is QueueType.QUORUM or queue_type is QueueType.STREAM:
if durable is EMPTY:
durable = True
elif not durable:
raise SetupError("Quorum and Stream queues must be durable")
elif durable is EMPTY:
durable = False
super().__init__(name)
self.path_regex = re
self.durable = durable
self.exclusive = exclusive
self.bind_arguments = bind_arguments
self.routing_key = routing_key
self.robust = robust
self.passive = passive
self.auto_delete = auto_delete
self.arguments = {"x-queue-type": queue_type.value, **(arguments or {})}
self.timeout = timeout
|
routing property
Return real routing_key of object.
path_regex instance-attribute
durable instance-attribute
exclusive instance-attribute
bind_arguments instance-attribute
routing_key instance-attribute
robust instance-attribute
passive instance-attribute
auto_delete instance-attribute
arguments instance-attribute
arguments = {'x-queue-type': value, None: arguments or {}}
timeout instance-attribute
validate classmethod
validate(value: Union[str, NameRequiredCls], **kwargs: Any) -> NameRequiredCls
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)
Factory to create object.
Source code in faststream/broker/schemas.py
| @classmethod
def validate(
cls: Type[NameRequiredCls],
value: Union[str, NameRequiredCls, None],
**kwargs: Any,
) -> Optional[NameRequiredCls]:
"""Factory to create object."""
if value is not None and isinstance(value, str):
value = cls(value, **kwargs)
return value
|
add_prefix
Source code in faststream/rabbit/schemas/queue.py
| def add_prefix(self, prefix: str) -> "RabbitQueue":
new_q: RabbitQueue = deepcopy(self)
new_q.name = "".join((prefix, new_q.name))
if new_q.routing_key:
new_q.routing_key = "".join((prefix, new_q.routing_key))
return new_q
|