Skip to content

StreamMessage

faststream.broker.fastapi.StreamMessage #

StreamMessage(
    *,
    body: Union[AnyDict, List[Any]],
    headers: AnyDict,
    path: AnyDict
)

Bases: Request

A class to represent a stream message.

METHOD DESCRIPTION
__init__

initializes the StreamMessage object

get_session

returns a callable function that handles the session of the message

Initialize a class instance.

PARAMETER DESCRIPTION
body

The body of the request as a dictionary.

TYPE: Union[AnyDict, List[Any]]

headers

The headers of the request as a dictionary.

TYPE: AnyDict

path

The path of the request as a dictionary.

TYPE: AnyDict

Source code in faststream/broker/fastapi/route.py
def __init__(
    self,
    *,
    body: Union[AnyDict, List[Any]],
    headers: AnyDict,
    path: AnyDict,
) -> None:
    """Initialize a class instance.

    Args:
        body: The body of the request as a dictionary.
        headers: The headers of the request as a dictionary.
        path: The path of the request as a dictionary.

    Attributes:
        scope: A dictionary to store the scope of the request.
        _cookies: A dictionary to store the cookies of the request.
        _headers: A dictionary to store the headers of the request.
        _body: A dictionary to store the body of the request.
        _query_params: A dictionary to store the query parameters of the request.

    """
    self._headers = headers
    self._body = body
    self._query_params = path

    self.scope = {"path_params": self._query_params}
    self._cookies = {}

app property #

app: Any

auth property #

auth: Any

base_url property #

base_url: URL

client property #

client: Optional[Address]

cookies property #

cookies: Dict[str, str]

headers property #

headers: Headers

method property #

method: str

path_params property #

path_params: Dict[str, Any]

query_params property #

query_params: QueryParams

receive property #

receive: Receive

scope instance-attribute #

scope: AnyDict = {'path_params': _query_params}

session property #

session: Dict[str, Any]

state property #

state: State

url property #

url: URL

user property #

user: Any

body async #

body() -> bytes
Source code in starlette/requests.py
async def body(self) -> bytes:
    if not hasattr(self, "_body"):
        chunks: "typing.List[bytes]" = []
        async for chunk in self.stream():
            chunks.append(chunk)
        self._body = b"".join(chunks)
    return self._body

close async #

close() -> None
Source code in starlette/requests.py
async def close(self) -> None:
    if self._form is not None:
        await self._form.close()

form #

form(
    *,
    max_files: Union[int, float] = 1000,
    max_fields: Union[int, float] = 1000
) -> AwaitableOrContextManager[FormData]
Source code in starlette/requests.py
def form(
    self,
    *,
    max_files: typing.Union[int, float] = 1000,
    max_fields: typing.Union[int, float] = 1000,
) -> AwaitableOrContextManager[FormData]:
    return AwaitableOrContextManagerWrapper(
        self._get_form(max_files=max_files, max_fields=max_fields)
    )

get_session classmethod #

get_session(
    dependant: Dependant,
    dependency_overrides_provider: Optional[Any] = None,
) -> Callable[
    [StreamMessage[Any]], Awaitable[SendableMessage]
]

Creates a session for handling requests.

PARAMETER DESCRIPTION
dependant

The dependant object representing the session.

TYPE: Dependant

dependency_overrides_provider

Optional provider for dependency overrides.

TYPE: Optional[Any] DEFAULT: None

RETURNS DESCRIPTION
Callable[[StreamMessage[Any]], Awaitable[SendableMessage]]

A callable that takes a native message and returns an awaitable sendable message.

RAISES DESCRIPTION
AssertionError

If the dependant call is not defined.

Note

This function is used to create a session for handling requests. It takes a dependant object, which represents the session, and a dependency overrides provider, which allows for overriding dependencies. It returns a callable that takes a native message and returns an awaitable sendable message. The session is created based on the dependant object and the message passed to the callable. The session is then used to call the function obtained from the dependant object, and the result is returned.

Source code in faststream/broker/fastapi/route.py
@classmethod
def get_session(
    cls,
    dependant: Dependant,
    dependency_overrides_provider: Optional[Any] = None,
) -> Callable[[NativeMessage[Any]], Awaitable[SendableMessage]]:
    """Creates a session for handling requests.

    Args:
        dependant: The dependant object representing the session.
        dependency_overrides_provider: Optional provider for dependency overrides.

    Returns:
        A callable that takes a native message and returns an awaitable sendable message.

    Raises:
        AssertionError: If the dependant call is not defined.

    Note:
        This function is used to create a session for handling requests. It takes a dependant object, which represents the session, and a dependency overrides provider, which allows for overriding dependencies. It returns a callable that takes a native message and returns an awaitable sendable message. The session is created based on the dependant object and the message passed to the callable. The session is then used to call the function obtained from the dependant object, and the result is returned.
    """
    assert dependant.call  # nosec B101

    func = get_app(dependant, dependency_overrides_provider)

    dependencies_names = tuple(i.name for i in dependant.dependencies)

    first_arg = next(
        dropwhile(
            lambda i: i in dependencies_names,
            inspect.signature(dependant.call).parameters,
        ),
        None,
    )

    async def app(message: NativeMessage[Any]) -> SendableMessage:
        """An asynchronous function that processes an incoming message and returns a sendable message.

        Args:
            message : The incoming message to be processed

        Returns:
            The sendable message

        Raises:
            TypeError: If the body of the message is not a dictionary
        !!! note

            The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
        """
        body = message.decoded_body

        fastapi_body: Union[AnyDict, List[Any]]
        if first_arg is not None:
            if isinstance(body, dict):
                path = fastapi_body = body or {}
            elif isinstance(body, list):
                fastapi_body, path = body, {}
            else:
                path = fastapi_body = {first_arg: body}

            session = cls(
                body=fastapi_body,
                headers=message.headers,
                path={**path, **message.path},
            )

        else:
            session = cls(
                body={},
                headers={},
                path={},
            )

        return await func(session)

    return app

is_disconnected async #

is_disconnected() -> bool
Source code in starlette/requests.py
async def is_disconnected(self) -> bool:
    if not self._is_disconnected:
        message: Message = {}

        # If message isn't immediately available, move on
        with anyio.CancelScope() as cs:
            cs.cancel()
            message = await self._receive()

        if message.get("type") == "http.disconnect":
            self._is_disconnected = True

    return self._is_disconnected

json async #

json() -> Any
Source code in starlette/requests.py
async def json(self) -> typing.Any:
    if not hasattr(self, "_json"):
        body = await self.body()
        self._json = json.loads(body)
    return self._json

send_push_promise async #

send_push_promise(path: str) -> None
Source code in starlette/requests.py
async def send_push_promise(self, path: str) -> None:
    if "http.response.push" in self.scope.get("extensions", {}):
        raw_headers: "typing.List[typing.Tuple[bytes, bytes]]" = []
        for name in SERVER_PUSH_HEADERS_TO_COPY:
            for value in self.headers.getlist(name):
                raw_headers.append(
                    (name.encode("latin-1"), value.encode("latin-1"))
                )
        await self._send(
            {"type": "http.response.push", "path": path, "headers": raw_headers}
        )

stream async #

stream() -> AsyncGenerator[bytes, None]
Source code in starlette/requests.py
async def stream(self) -> typing.AsyncGenerator[bytes, None]:
    if hasattr(self, "_body"):
        yield self._body
        yield b""
        return
    if self._stream_consumed:
        raise RuntimeError("Stream consumed")
    while not self._stream_consumed:
        message = await self._receive()
        if message["type"] == "http.request":
            body = message.get("body", b"")
            if not message.get("more_body", False):
                self._stream_consumed = True
            if body:
                yield body
        elif message["type"] == "http.disconnect":
            self._is_disconnected = True
            raise ClientDisconnect()
    yield b""

url_for #

url_for(name: str, /, **path_params: Any) -> URL
Source code in starlette/requests.py
def url_for(self, name: str, /, **path_params: typing.Any) -> URL:
    router: Router = self.scope["router"]
    url_path = router.url_path_for(name, **path_params)
    return url_path.make_absolute_url(base_url=self.base_url)