Skip to content

make_fastapi_execution

faststream.broker.fastapi.route.make_fastapi_execution #

make_fastapi_execution(*, dependent, provider_factory, response_field, response_model_include, response_model_exclude, response_model_by_alias, response_model_exclude_unset, response_model_exclude_defaults, response_model_exclude_none)

Creates a FastAPI application.

Source code in faststream/broker/fastapi/route.py
def make_fastapi_execution(
    *,
    dependent: "Dependant",
    provider_factory: Callable[[], Any],
    response_field: Optional["ModelField"],
    response_model_include: Optional["IncEx"],
    response_model_exclude: Optional["IncEx"],
    response_model_by_alias: bool,
    response_model_exclude_unset: bool,
    response_model_exclude_defaults: bool,
    response_model_exclude_none: bool,
) -> Callable[
    ["StreamMessage", "NativeMessage[Any]"],
    Awaitable[Response],
]:
    """Creates a FastAPI application."""
    is_coroutine = asyncio.iscoroutinefunction(dependent.call)

    async def app(
        request: "StreamMessage",
        raw_message: "NativeMessage[Any]",  # to support BackgroundTasks by middleware
    ) -> Response:
        """Consume StreamMessage and return user function result."""
        async with AsyncExitStack() as stack:
            if FASTAPI_V106:
                kwargs = {"async_exit_stack": stack}
            else:
                request.scope["fastapi_astack"] = stack
                kwargs = {}

            solved_result = await solve_faststream_dependency(
                request=request,
                dependant=dependent,
                dependency_overrides_provider=provider_factory(),
                **kwargs,
            )

            raw_message.background = solved_result.background_tasks  # type: ignore[attr-defined]

            if solved_result.errors:
                raise_fastapi_validation_error(solved_result.errors, request._body)  # type: ignore[arg-type]

            function_result = await run_endpoint_function(
                dependant=dependent,
                values=solved_result.values,
                is_coroutine=is_coroutine,
            )

            response = ensure_response(function_result)

            response.body = await serialize_response(
                response_content=response.body,
                field=response_field,
                include=response_model_include,
                exclude=response_model_exclude,
                by_alias=response_model_by_alias,
                exclude_unset=response_model_exclude_unset,
                exclude_defaults=response_model_exclude_defaults,
                exclude_none=response_model_exclude_none,
                is_coroutine=is_coroutine,
            )

            return response

        raise AssertionError("unreachable")

    return app