async def declare_exchange(
self,
exchange: "RabbitExchange",
declare: bool = EMPTY,
passive: Annotated[
bool,
deprecated("Use `declare` instead. Will be removed in the 0.6.0 release."),
] = EMPTY,
*,
channel: Optional["Channel"] = None,
) -> "aio_pika.RobustExchange":
"""Declare an exchange, parent exchanges and bind them each other."""
channel_obj = await self.__channel_manager.get_channel(channel)
if not exchange.name:
return channel_obj.default_exchange
if (exch := self.__exchanges.get(exchange)) is None:
if passive is not EMPTY:
warnings.warn(
DeprecationWarning(
"Use `declare` instead. Will be removed in the 0.6.0 release.",
),
stacklevel=2,
)
declare = not passive
elif declare is EMPTY:
declare = exchange.declare
self.__exchanges[exchange] = exch = cast(
"aio_pika.RobustExchange",
await channel_obj.declare_exchange(
name=exchange.name,
type=exchange.type.value,
durable=exchange.durable,
auto_delete=exchange.auto_delete,
passive=not declare,
arguments=exchange.arguments,
timeout=exchange.timeout,
robust=exchange.robust,
internal=False, # deprecated RMQ option
),
)
if exchange.bind_to is not None:
parent = await self.declare_exchange(exchange.bind_to)
await exch.bind(
exchange=parent,
routing_key=exchange.routing,
arguments=exchange.bind_arguments,
timeout=exchange.timeout,
robust=exchange.robust,
)
return exch