Skip to content

get_app

faststream.broker.fastapi.route.get_app #

get_app(
    dependant: Dependant,
    dependency_overrides_provider: Optional[Any] = None,
) -> Callable[
    [StreamMessage], Coroutine[Any, Any, SendableMessage]
]

Creates a FastAPI application.

PARAMETER DESCRIPTION
dependant

The dependant object that defines the endpoint function and its dependencies.

TYPE: Dependant

dependency_overrides_provider

Optional provider for dependency overrides.

TYPE: Optional[Any] DEFAULT: None

RETURNS DESCRIPTION
Callable[[StreamMessage], Coroutine[Any, Any, SendableMessage]]

The FastAPI application as a callable that takes a StreamMessage object as input and returns a SendableMessage coroutine.

RAISES DESCRIPTION
AssertionError

If the code reaches an unreachable state.

Source code in faststream/broker/fastapi/route.py
def get_app(
    dependant: Dependant,
    dependency_overrides_provider: Optional[Any] = None,
) -> Callable[
    [StreamMessage],
    Coroutine[Any, Any, SendableMessage],
]:
    """Creates a FastAPI application.

    Args:
        dependant: The dependant object that defines the endpoint function and its dependencies.
        dependency_overrides_provider: Optional provider for dependency overrides.

    Returns:
        The FastAPI application as a callable that takes a StreamMessage object as input and returns a SendableMessage coroutine.

    Raises:
        AssertionError: If the code reaches an unreachable state.
    """

    async def app(request: StreamMessage) -> SendableMessage:
        """Handle an HTTP request and return a response.

        Args:
            request: The incoming HTTP request.

        Returns:
            The response to be sent back to the client.

        Raises:
            AssertionError: If the code reaches an unreachable point.
        """
        async with AsyncExitStack() as stack:
            if FASTAPI_V106:
                kwargs = {"async_exit_stack": stack}
            else:
                request.scope["fastapi_astack"] = stack
                kwargs = {}

            solved_result = await solve_dependencies(
                request=request,
                body=request._body,  # type: ignore[arg-type]
                dependant=dependant,
                dependency_overrides_provider=dependency_overrides_provider,
                **kwargs,  # type: ignore[arg-type]
            )

            values, errors, _, _2, _3 = solved_result
            if errors:
                raise_fastapi_validation_error(errors, request._body)  # type: ignore[arg-type]

            return cast(
                SendableMessage,
                await run_endpoint_function(
                    dependant=dependant,
                    values=values,
                    is_coroutine=asyncio.iscoroutinefunction(dependant.call),
                ),
            )

        raise AssertionError("unreachable")

    return app