def make_fastapi_execution(
*,
dependent: "Dependant",
provider_factory: Callable[[], Any],
response_field: Optional["ModelField"],
response_model_include: Optional["IncEx"],
response_model_exclude: Optional["IncEx"],
response_model_by_alias: bool,
response_model_exclude_unset: bool,
response_model_exclude_defaults: bool,
response_model_exclude_none: bool,
) -> Callable[
["StreamMessage", "NativeMessage[Any]"],
Awaitable[Response],
]:
"""Creates a FastAPI application."""
is_coroutine = asyncio.iscoroutinefunction(dependent.call)
async def app(
request: "StreamMessage",
raw_message: "NativeMessage[Any]", # to support BackgroundTasks by middleware
) -> Response:
"""Consume StreamMessage and return user function result."""
async with AsyncExitStack() as stack:
if FASTAPI_V106:
kwargs = {"async_exit_stack": stack}
else:
request.scope["fastapi_astack"] = stack
kwargs = {}
solved_result = await solve_faststream_dependency(
request=request,
dependant=dependent,
dependency_overrides_provider=provider_factory(),
**kwargs,
)
raw_message.background = solved_result.background_tasks # type: ignore[attr-defined]
if solved_result.errors:
raise_fastapi_validation_error(solved_result.errors, request._body) # type: ignore[arg-type]
function_result = await run_endpoint_function(
dependant=dependent,
values=solved_result.values,
is_coroutine=is_coroutine,
)
response = ensure_response(function_result)
response.body = await serialize_response(
response_content=response.body,
field=response_field,
include=response_model_include,
exclude=response_model_exclude,
by_alias=response_model_by_alias,
exclude_unset=response_model_exclude_unset,
exclude_defaults=response_model_exclude_defaults,
exclude_none=response_model_exclude_none,
is_coroutine=is_coroutine,
)
return response
raise AssertionError("unreachable")
return app