Skip to content

RabbitExchange

faststream.rabbit.schemas.exchange.RabbitExchange #

RabbitExchange(name='', type=ExchangeType.DIRECT, durable=False, auto_delete=False, passive=False, arguments=None, timeout=None, robust=True, bind_to=None, bind_arguments=None, routing_key='')

Bases: NameRequired

A class to represent a RabbitMQ exchange.

Initialize a RabbitExchange object.

Source code in faststream/rabbit/schemas/exchange.py
def __init__(
    self,
    name: Annotated[
        str,
        Doc("RabbitMQ exchange name."),
    ] = "",
    type: Annotated[
        ExchangeType,
        Doc(
            "RabbitMQ exchange type. "
            "You can find detail information in the official RabbitMQ documentation: "
            "https://www.rabbitmq.com/tutorials/amqp-concepts#exchanges"
            "\n"
            "Or in the FastStream one: "
            "https://faststream.airt.ai/latest/rabbit/examples/"
        ),
    ] = ExchangeType.DIRECT,
    durable: Annotated[
        bool,
        Doc("Whether the object is durable."),
    ] = False,
    auto_delete: Annotated[
        bool,
        Doc("The exchange will be deleted after connection closed."),
    ] = False,
    passive: Annotated[
        bool,
        Doc("Do not create exchange automatically."),
    ] = False,
    arguments: Annotated[
        Optional[AnyDict],
        Doc(
            "Exchange declarationg arguments. "
            "You can find usage example in the official RabbitMQ documentation: "
            "https://www.rabbitmq.com/docs/ae"
        ),
    ] = None,
    timeout: Annotated[
        "TimeoutType",
        Doc("Send confirmation time from RabbitMQ."),
    ] = None,
    robust: Annotated[
        bool,
        Doc("Whether to declare exchange object as restorable."),
    ] = True,
    bind_to: Annotated[
        Optional["RabbitExchange"],
        Doc(
            "Another `RabbitExchange` object to bind the current one to. "
            "You can find more information in the official RabbitMQ blog post: "
            "https://www.rabbitmq.com/blog/2010/10/19/exchange-to-exchange-bindings"
        ),
    ] = None,
    bind_arguments: Annotated[
        Optional[AnyDict],
        Doc("Exchange-exchange binding options."),
    ] = None,
    routing_key: Annotated[
        str,
        Doc("Explicit binding routing key."),
    ] = "",
) -> None:
    """Initialize a RabbitExchange object."""
    if routing_key and bind_to is None:  # pragma: no cover
        warnings.warn(
            (
                "\nRabbitExchange `routing_key` is using to bind exchange to another one."
                "\nIt can be used only with the `bind_to` argument, please setup it too."
            ),
            category=RuntimeWarning,
            stacklevel=1,
        )

    super().__init__(name)

    self.type = type
    self.durable = durable
    self.auto_delete = auto_delete
    self.robust = robust
    self.passive = passive
    self.timeout = timeout
    self.arguments = arguments

    self.bind_to = bind_to
    self.bind_arguments = bind_arguments
    self.routing_key = routing_key

name instance-attribute #

name = name

routing property #

routing

Return real routing_key of object.

type instance-attribute #

type = type

durable instance-attribute #

durable = durable

auto_delete instance-attribute #

auto_delete = auto_delete

robust instance-attribute #

robust = robust

passive instance-attribute #

passive = passive

timeout instance-attribute #

timeout = timeout

arguments instance-attribute #

arguments = arguments

bind_to instance-attribute #

bind_to = bind_to

bind_arguments instance-attribute #

bind_arguments = bind_arguments

routing_key instance-attribute #

routing_key = routing_key

validate classmethod #

validate(value, **kwargs)
Source code in faststream/rabbit/schemas/exchange.py
@override
@classmethod
def validate(  # type: ignore[override]
    cls,
    value: Union[str, "RabbitExchange", None],
    **kwargs: Any,
) -> "RabbitExchange":
    exch = super().validate(value, **kwargs)
    if exch is None:
        exch = RabbitExchange()
    return exch