Skip to content

RabbitQueue

faststream.rabbit.schemas.queue.RabbitQueue #

RabbitQueue(name, durable=False, exclusive=False, passive=False, auto_delete=False, arguments=None, timeout=None, robust=True, bind_arguments=None, routing_key='')

Bases: NameRequired

A class to represent a RabbitMQ queue.

You can find information about all options in the official RabbitMQ documentation:

https://www.rabbitmq.com/docs/queues

Source code in faststream/rabbit/schemas/queue.py
def __init__(
    self,
    name: Annotated[
        str,
        Doc("RabbitMQ queue name."),
    ],
    durable: Annotated[
        bool,
        Doc("Whether the object is durable."),
    ] = False,
    exclusive: Annotated[
        bool,
        Doc(
            "The queue can be used only in the current connection "
            "and will be deleted after connection closed."
        ),
    ] = False,
    passive: Annotated[
        bool,
        Doc("Do not create queue automatically."),
    ] = False,
    auto_delete: Annotated[
        bool,
        Doc("The queue will be deleted after connection closed."),
    ] = False,
    arguments: Annotated[
        Optional["AnyDict"],
        Doc(
            "Queue declarationg arguments. "
            "You can find information about them in the official RabbitMQ documentation: https://www.rabbitmq.com/docs/queues#optional-arguments"
        ),
    ] = None,
    timeout: Annotated[
        "TimeoutType",
        Doc("Send confirmation time from RabbitMQ."),
    ] = None,
    robust: Annotated[
        bool,
        Doc("Whether to declare queue object as restorable."),
    ] = True,
    bind_arguments: Annotated[
        Optional["AnyDict"],
        Doc("Queue-exchange binding options."),
    ] = None,
    routing_key: Annotated[
        str,
        Doc("Explicit binding routing key. Uses `name` if not presented."),
    ] = "",
) -> None:
    re, routing_key = compile_path(
        routing_key,
        replace_symbol="*",
        patch_regex=lambda x: x.replace(r"\#", ".+"),
    )

    super().__init__(name)

    self.path_regex = re
    self.durable = durable
    self.exclusive = exclusive
    self.bind_arguments = bind_arguments
    self.routing_key = routing_key
    self.robust = robust
    self.passive = passive
    self.auto_delete = auto_delete
    self.arguments = arguments
    self.timeout = timeout

name instance-attribute #

name = name

routing property #

routing

Return real routing_key of object.

path_regex instance-attribute #

path_regex = re

durable instance-attribute #

durable = durable

exclusive instance-attribute #

exclusive = exclusive

bind_arguments instance-attribute #

bind_arguments = bind_arguments

routing_key instance-attribute #

routing_key = routing_key

robust instance-attribute #

robust = robust

passive instance-attribute #

passive = passive

auto_delete instance-attribute #

auto_delete = auto_delete

arguments instance-attribute #

arguments = arguments

timeout instance-attribute #

timeout = timeout

validate classmethod #

validate(value: Union[str, NameRequiredCls], **kwargs: Any) -> NameRequiredCls
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)

Factory to create object.

Source code in faststream/broker/schemas.py
@classmethod
def validate(
    cls: Type[NameRequiredCls],
    value: Union[str, NameRequiredCls, None],
    **kwargs: Any,
) -> Optional[NameRequiredCls]:
    """Factory to create object."""
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value

add_prefix #

add_prefix(prefix)
Source code in faststream/rabbit/schemas/queue.py
def add_prefix(self, prefix: str) -> "RabbitQueue":
    new_q: RabbitQueue = deepcopy(self)

    new_q.name = "".join((prefix, new_q.name))

    if new_q.routing_key:
        new_q.routing_key = "".join((prefix, new_q.routing_key))

    return new_q