Skip to content

RabbitQueue

faststream.rabbit.RabbitQueue #

RabbitQueue(name, queue_type=CLASSIC, durable=EMPTY, exclusive=False, passive=False, auto_delete=False, arguments=None, timeout=None, robust=True, bind_arguments=None, routing_key='')

Bases: NameRequired

A class to represent a RabbitMQ queue.

You can find information about all options in the official RabbitMQ documentation:

https://www.rabbitmq.com/docs/queues

Initialize the RabbitMQ queue.

:param name: RabbitMQ queue name. :param durable: Whether the object is durable. :param exclusive: The queue can be used only in the current connection and will be deleted after connection closed. :param passive: Do not create queue automatically. :param auto_delete: The queue will be deleted after connection closed. :param arguments: Queue declaration arguments. You can find information about them in the official RabbitMQ documentation: https://www.rabbitmq.com/docs/queues#optional-arguments :param timeout: Send confirmation time from RabbitMQ. :param robust: Whether to declare queue object as restorable. :param bind_arguments: Queue-exchange binding options. :param routing_key: Explicit binding routing key. Uses name if not presented.

Source code in faststream/rabbit/schemas/queue.py
def __init__(
    self,
    name: str,
    queue_type: QueueType = QueueType.CLASSIC,
    durable: bool = EMPTY,
    exclusive: bool = False,
    passive: bool = False,
    auto_delete: bool = False,
    arguments: Union[
        "QuorumQueueArgs",
        "ClassicQueueArgs",
        "StreamQueueArgs",
        "AnyDict",
        None,
    ] = None,
    timeout: "TimeoutType" = None,
    robust: bool = True,
    bind_arguments: Optional["AnyDict"] = None,
    routing_key: str = "",
) -> None:
    """Initialize the RabbitMQ queue.

    :param name: RabbitMQ queue name.
    :param durable: Whether the object is durable.
    :param exclusive: The queue can be used only in the current connection and will be deleted after connection closed.
    :param passive: Do not create queue automatically.
    :param auto_delete: The queue will be deleted after connection closed.
    :param arguments: Queue declaration arguments.
                      You can find information about them in the official RabbitMQ documentation:
                      https://www.rabbitmq.com/docs/queues#optional-arguments
    :param timeout: Send confirmation time from RabbitMQ.
    :param robust: Whether to declare queue object as restorable.
    :param bind_arguments: Queue-exchange binding options.
    :param routing_key: Explicit binding routing key. Uses name if not presented.
    """
    re, routing_key = compile_path(
        routing_key,
        replace_symbol="*",
        patch_regex=lambda x: x.replace(r"\#", ".+"),
    )

    if queue_type is QueueType.QUORUM or queue_type is QueueType.STREAM:
        if durable is EMPTY:
            durable = True
        elif not durable:
            raise SetupError("Quorum and Stream queues must be durable")
    elif durable is EMPTY:
        durable = False

    super().__init__(name)

    self.path_regex = re
    self.durable = durable
    self.exclusive = exclusive
    self.bind_arguments = bind_arguments
    self.routing_key = routing_key
    self.robust = robust
    self.passive = passive
    self.auto_delete = auto_delete
    self.arguments = {"x-queue-type": queue_type.value, **(arguments or {})}
    self.timeout = timeout

name instance-attribute #

name = name

routing property #

routing

Return real routing_key of object.

path_regex instance-attribute #

path_regex = re

durable instance-attribute #

durable = durable

exclusive instance-attribute #

exclusive = exclusive

bind_arguments instance-attribute #

bind_arguments = bind_arguments

routing_key instance-attribute #

routing_key = routing_key

robust instance-attribute #

robust = robust

passive instance-attribute #

passive = passive

auto_delete instance-attribute #

auto_delete = auto_delete

arguments instance-attribute #

arguments = {'x-queue-type': value, None: arguments or {}}

timeout instance-attribute #

timeout = timeout

validate classmethod #

validate(value: Union[str, NameRequiredCls], **kwargs: Any) -> NameRequiredCls
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)

Factory to create object.

Source code in faststream/broker/schemas.py
@classmethod
def validate(
    cls: Type[NameRequiredCls],
    value: Union[str, NameRequiredCls, None],
    **kwargs: Any,
) -> Optional[NameRequiredCls]:
    """Factory to create object."""
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value

add_prefix #

add_prefix(prefix)
Source code in faststream/rabbit/schemas/queue.py
def add_prefix(self, prefix: str) -> "RabbitQueue":
    new_q: RabbitQueue = deepcopy(self)

    new_q.name = "".join((prefix, new_q.name))

    if new_q.routing_key:
        new_q.routing_key = "".join((prefix, new_q.routing_key))

    return new_q