async def process_msg(
msg: Optional[MsgType],
middlewares: Sequence["BrokerMiddleware[MsgType]"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
) -> Optional["StreamMessage[MsgType]"]:
if msg is None:
return None
async with AsyncExitStack() as stack:
return_msg: Callable[
[StreamMessage[MsgType]],
Awaitable[StreamMessage[MsgType]],
] = return_input
for m in middlewares[::-1]:
mid = m(msg)
await stack.enter_async_context(mid)
return_msg = partial(mid.consume_scope, return_msg)
parsed_msg = await parser(msg)
parsed_msg._decoded_body = await decoder(parsed_msg)
return await return_msg(parsed_msg)
raise AssertionError("unreachable")