Skip to content

ChannelManager

faststream.rabbit.helpers.ChannelManager #

ChannelManager(connection, *, default_channel)
Source code in faststream/rabbit/helpers/channel_manager.py
def __init__(
    self,
    connection: "aio_pika.RobustConnection",
    *,
    default_channel: "Channel",
) -> None:
    self.__connection = connection
    self.__default_channel = default_channel
    self.__channels: Dict[Channel, aio_pika.RobustChannel] = {}

get_channel async #

get_channel(channel=None)

Declare a queue.

Source code in faststream/rabbit/helpers/channel_manager.py
async def get_channel(
    self,
    channel: Optional["Channel"] = None,
) -> "aio_pika.RobustChannel":
    """Declare a queue."""
    if channel is None:
        channel = self.__default_channel

    if (ch := self.__channels.get(channel)) is None:
        self.__channels[channel] = ch = cast(
            "aio_pika.RobustChannel",
            await self.__connection.channel(
                channel_number=channel.channel_number,
                publisher_confirms=channel.publisher_confirms,
                on_return_raises=channel.on_return_raises,
            ),
        )

        if channel.prefetch_count:
            await ch.set_qos(prefetch_count=channel.prefetch_count)

    return ch