Skip to content

get_broker_channels

faststream.asyncapi.generate.get_broker_channels #

get_broker_channels(broker)

Get the broker channels for an application.

Source code in faststream/asyncapi/generate.py
def get_broker_channels(
    broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> Dict[str, Channel]:
    """Get the broker channels for an application."""
    channels = {}

    for h in broker._subscribers.values():
        channels.update(h.schema())

    for p in broker._publishers.values():
        channels.update(p.schema())

    return channels