Skip to content

build_faststream_to_fastapi_parser

faststream.broker.fastapi.route.build_faststream_to_fastapi_parser #

build_faststream_to_fastapi_parser(*, dependent, provider_factory, response_field, response_model_include, response_model_exclude, response_model_by_alias, response_model_exclude_unset, response_model_exclude_defaults, response_model_exclude_none)

Creates a session for handling requests.

Source code in faststream/broker/fastapi/route.py
def build_faststream_to_fastapi_parser(
    *,
    dependent: "Dependant",
    provider_factory: Callable[[], Any],
    response_field: Optional["ModelField"],
    response_model_include: Optional["IncEx"],
    response_model_exclude: Optional["IncEx"],
    response_model_by_alias: bool,
    response_model_exclude_unset: bool,
    response_model_exclude_defaults: bool,
    response_model_exclude_none: bool,
) -> Callable[["NativeMessage[Any]"], Awaitable[Any]]:
    """Creates a session for handling requests."""
    assert dependent.call  # nosec B101

    consume = make_fastapi_execution(
        dependent=dependent,
        provider_factory=provider_factory,
        response_field=response_field,
        response_model_include=response_model_include,
        response_model_exclude=response_model_exclude,
        response_model_by_alias=response_model_by_alias,
        response_model_exclude_unset=response_model_exclude_unset,
        response_model_exclude_defaults=response_model_exclude_defaults,
        response_model_exclude_none=response_model_exclude_none,
    )

    dependencies_names = tuple(i.name for i in dependent.dependencies)

    first_arg = next(
        dropwhile(
            lambda i: i in dependencies_names,
            inspect.signature(dependent.call).parameters,
        ),
        None,
    )

    async def parsed_consumer(message: "NativeMessage[Any]") -> Any:
        """Wrapper, that parser FastStream message to FastAPI compatible one."""
        body = await message.decode()

        fastapi_body: Union[AnyDict, List[Any]]
        if first_arg is not None:
            if isinstance(body, dict):
                path = fastapi_body = body or {}
            elif isinstance(body, list):
                fastapi_body, path = body, {}
            else:
                path = fastapi_body = {first_arg: body}

            stream_message = StreamMessage(
                body=fastapi_body,
                headers=message.headers,
                path={**path, **message.path},
            )

        else:
            stream_message = StreamMessage(
                body={},
                headers={},
                path={},
            )

        return await consume(stream_message, message)

    return parsed_consumer