Skip to content

process_msg

faststream.broker.utils.process_msg async #

process_msg(msg, middlewares, parser, decoder)
Source code in faststream/broker/utils.py
async def process_msg(
    msg: Optional[MsgType],
    middlewares: Sequence["BrokerMiddleware[MsgType]"],
    parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
    decoder: Callable[["StreamMessage[MsgType]"], "Any"],
) -> Optional["StreamMessage[MsgType]"]:
    if msg is None:
        return None

    async with AsyncExitStack() as stack:
        return_msg: Callable[
            [StreamMessage[MsgType]],
            Awaitable[StreamMessage[MsgType]],
        ] = return_input

        for m in middlewares[::-1]:
            mid = m(msg)
            await stack.enter_async_context(mid)
            return_msg = partial(mid.consume_scope, return_msg)

        parsed_msg = await parser(msg)
        parsed_msg._decoded_body = await decoder(parsed_msg)
        return await return_msg(parsed_msg)

    raise AssertionError("unreachable")