def build_faststream_to_fastapi_parser(
*,
dependent: "Dependant",
provider_factory: Callable[[], Any],
response_field: Optional["ModelField"],
response_model_include: Optional["IncEx"],
response_model_exclude: Optional["IncEx"],
response_model_by_alias: bool,
response_model_exclude_unset: bool,
response_model_exclude_defaults: bool,
response_model_exclude_none: bool,
) -> Callable[["NativeMessage[Any]"], Awaitable[Any]]:
"""Creates a session for handling requests."""
assert dependent.call # nosec B101
consume = make_fastapi_execution(
dependent=dependent,
provider_factory=provider_factory,
response_field=response_field,
response_model_include=response_model_include,
response_model_exclude=response_model_exclude,
response_model_by_alias=response_model_by_alias,
response_model_exclude_unset=response_model_exclude_unset,
response_model_exclude_defaults=response_model_exclude_defaults,
response_model_exclude_none=response_model_exclude_none,
)
dependencies_names = tuple(i.name for i in dependent.dependencies)
first_arg = next(
dropwhile(
lambda i: i in dependencies_names,
inspect.signature(dependent.call).parameters,
),
None,
)
async def parsed_consumer(message: "NativeMessage[Any]") -> Any:
"""Wrapper, that parser FastStream message to FastAPI compatible one."""
body = await message.decode()
fastapi_body: Union[AnyDict, List[Any]]
if first_arg is not None:
if isinstance(body, dict):
path = fastapi_body = body or {}
elif isinstance(body, list):
fastapi_body, path = body, {}
else:
path = fastapi_body = {first_arg: body}
stream_message = StreamMessage(
body=fastapi_body,
headers=message.headers,
path={**path, **message.path},
)
else:
stream_message = StreamMessage(
body={},
headers={},
path={},
)
return await consume(stream_message, message)
return parsed_consumer