Skip to content

StreamMessage

faststream.broker.fastapi.StreamMessage #

StreamMessage(
    body: Optional[AnyDict] = None,
    headers: Optional[AnyDict] = None,
    path: Optional[AnyDict] = None,
)

Bases: Request

A class to represent a stream message.

METHOD DESCRIPTION
__init__

initializes the StreamMessage object

get_session

returns a callable function that handles the session of the message

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Initialize a class instance.

PARAMETER DESCRIPTION
body

The body of the request as a dictionary. Default is None.

TYPE: Optional[AnyDict] DEFAULT: None

headers

The headers of the request as a dictionary. Default is None.

TYPE: Optional[AnyDict] DEFAULT: None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/fastapi/route.py
def __init__(
    self,
    body: Optional[AnyDict] = None,
    headers: Optional[AnyDict] = None,
    path: Optional[AnyDict] = None,
):
    """Initialize a class instance.

    Args:
        body: The body of the request as a dictionary. Default is None.
        headers: The headers of the request as a dictionary. Default is None.

    Attributes:
        scope: A dictionary to store the scope of the request.
        _cookies: A dictionary to store the cookies of the request.
        _headers: A dictionary to store the headers of the request.
        _body: A dictionary to store the body of the request.
        _query_params: A dictionary to store the query parameters of the request.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    self.scope = {"path_params": path or {}}
    self._cookies = {}
    self._headers = headers or {}
    self._body = body or {}
    self._query_params = {**self._body, **(path or {})}

app property #

app: typing.Any

auth property #

auth: typing.Any

base_url property #

base_url: URL

client property #

client: typing.Optional[Address]

cookies property #

cookies: typing.Dict[str, str]

headers property #

headers: Headers

method property #

method: str

path_params property #

path_params: typing.Dict[str, typing.Any]

query_params property #

query_params: QueryParams

receive property #

receive: Receive

scope instance-attribute #

scope: AnyDict = {'path_params': path or {}}

session property #

session: typing.Dict[str, typing.Any]

state property #

state: State

url property #

url: URL

user property #

user: typing.Any

body async #

body() -> bytes
Source code in starlette/requests.py
async def body(self) -> bytes:
    if not hasattr(self, "_body"):
        chunks: "typing.List[bytes]" = []
        async for chunk in self.stream():
            chunks.append(chunk)
        self._body = b"".join(chunks)
    return self._body

close async #

close() -> None
Source code in starlette/requests.py
async def close(self) -> None:
    if self._form is not None:
        await self._form.close()

form #

form(
    *,
    max_files: typing.Union[int, float] = 1000,
    max_fields: typing.Union[int, float] = 1000
) -> AwaitableOrContextManager[FormData]
Source code in starlette/requests.py
def form(
    self,
    *,
    max_files: typing.Union[int, float] = 1000,
    max_fields: typing.Union[int, float] = 1000,
) -> AwaitableOrContextManager[FormData]:
    return AwaitableOrContextManagerWrapper(
        self._get_form(max_files=max_files, max_fields=max_fields)
    )

get_session classmethod #

get_session(
    dependant: Dependant,
    dependency_overrides_provider: Optional[Any] = None,
) -> Callable[
    [NativeMessage[Any]], Awaitable[SendableMessage]
]

Creates a session for handling requests.

PARAMETER DESCRIPTION
dependant

The dependant object representing the session.

TYPE: Dependant

dependency_overrides_provider

Optional provider for dependency overrides.

TYPE: Optional[Any] DEFAULT: None

RETURNS DESCRIPTION
Callable[[StreamMessage[Any]], Awaitable[SendableMessage]]

A callable that takes a native message and returns an awaitable sendable message.

RAISES DESCRIPTION
AssertionError

If the dependant call is not defined.

Note

This function is used to create a session for handling requests. It takes a dependant object, which represents the session, and a dependency overrides provider, which allows for overriding dependencies. It returns a callable that takes a native message and returns an awaitable sendable message. The session is created based on the dependant object and the message passed to the callable. The session is then used to call the function obtained from the dependant object, and the result is returned.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/fastapi/route.py
@classmethod
def get_session(
    cls,
    dependant: Dependant,
    dependency_overrides_provider: Optional[Any] = None,
) -> Callable[[NativeMessage[Any]], Awaitable[SendableMessage]]:
    """Creates a session for handling requests.

    Args:
        dependant: The dependant object representing the session.
        dependency_overrides_provider: Optional provider for dependency overrides.

    Returns:
        A callable that takes a native message and returns an awaitable sendable message.

    Raises:
        AssertionError: If the dependant call is not defined.

    Note:
        This function is used to create a session for handling requests. It takes a dependant object, which represents the session, and a dependency overrides provider, which allows for overriding dependencies. It returns a callable that takes a native message and returns an awaitable sendable message. The session is created based on the dependant object and the message passed to the callable. The session is then used to call the function obtained from the dependant object, and the result is returned.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    assert dependant.call  # nosec B101

    func = get_app(dependant, dependency_overrides_provider)

    dependencies_names = tuple(i.name for i in dependant.dependencies)

    first_arg = next(
        dropwhile(
            lambda i: i in dependencies_names,
            inspect.signature(dependant.call).parameters,
        ),
        None,
    )

    async def app(message: NativeMessage[Any]) -> SendableMessage:
        """An asynchronous function that processes an incoming message and returns a sendable message.

        Args:
            message : The incoming message to be processed

        Returns:
            The sendable message

        Raises:
            TypeError: If the body of the message is not a dictionary
        !!! note

            The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
        """
        body = message.decoded_body
        if first_arg is not None:
            if not isinstance(body, dict) and not isinstance(body, list):
                fastapi_body: Any = {first_arg: body}
            else:
                fastapi_body = body

            session = cls(fastapi_body, message.headers, message.path)
        else:
            session = cls()
        return await func(session)

    return app

is_disconnected async #

is_disconnected() -> bool
Source code in starlette/requests.py
async def is_disconnected(self) -> bool:
    if not self._is_disconnected:
        message: Message = {}

        # If message isn't immediately available, move on
        with anyio.CancelScope() as cs:
            cs.cancel()
            message = await self._receive()

        if message.get("type") == "http.disconnect":
            self._is_disconnected = True

    return self._is_disconnected

json async #

json() -> typing.Any
Source code in starlette/requests.py
async def json(self) -> typing.Any:
    if not hasattr(self, "_json"):
        body = await self.body()
        self._json = json.loads(body)
    return self._json

send_push_promise async #

send_push_promise(path: str) -> None
Source code in starlette/requests.py
async def send_push_promise(self, path: str) -> None:
    if "http.response.push" in self.scope.get("extensions", {}):
        raw_headers: "typing.List[typing.Tuple[bytes, bytes]]" = []
        for name in SERVER_PUSH_HEADERS_TO_COPY:
            for value in self.headers.getlist(name):
                raw_headers.append(
                    (name.encode("latin-1"), value.encode("latin-1"))
                )
        await self._send(
            {"type": "http.response.push", "path": path, "headers": raw_headers}
        )

stream async #

stream() -> typing.AsyncGenerator[bytes, None]
Source code in starlette/requests.py
async def stream(self) -> typing.AsyncGenerator[bytes, None]:
    if hasattr(self, "_body"):
        yield self._body
        yield b""
        return
    if self._stream_consumed:
        raise RuntimeError("Stream consumed")
    self._stream_consumed = True
    while True:
        message = await self._receive()
        if message["type"] == "http.request":
            body = message.get("body", b"")
            if body:
                yield body
            if not message.get("more_body", False):
                break
        elif message["type"] == "http.disconnect":
            self._is_disconnected = True
            raise ClientDisconnect()
    yield b""

url_for #

url_for(__name: str, **path_params: typing.Any) -> URL
Source code in starlette/requests.py
def url_for(self, __name: str, **path_params: typing.Any) -> URL:
    router: Router = self.scope["router"]
    url_path = router.url_path_for(__name, **path_params)
    return url_path.make_absolute_url(base_url=self.base_url)

Last update: 2023-11-13