Skip to content

get_app_broker_channels

faststream.asyncapi.generate.get_app_broker_channels #

get_app_broker_channels(
    app: Union[FastStream, StreamRouter[Any]]
) -> Dict[str, Channel]

Get the broker channels for an application.

PARAMETER DESCRIPTION
app

An instance of FastStream or StreamRouter.

TYPE: Union[FastStream, StreamRouter[Any]]

RETURNS DESCRIPTION
Dict[str, Channel]

A dictionary of channel names and their corresponding Channel objects.

RAISES DESCRIPTION
AssertionError

If the app does not have a broker.

Source code in faststream/asyncapi/generate.py
def get_app_broker_channels(
    app: Union[FastStream, "StreamRouter[Any]"],
) -> Dict[str, Channel]:
    """Get the broker channels for an application.

    Args:
        app: An instance of FastStream or StreamRouter.

    Returns:
        A dictionary of channel names and their corresponding Channel objects.

    Raises:
        AssertionError: If the app does not have a broker.

    """
    channels = {}
    assert app.broker  # nosec B101

    for h in app.broker.handlers.values():
        channels.update(h.schema())

    for p in app.broker._publishers.values():
        channels.update(p.schema())

    return channels