Skip to content

StreamMessage

faststream.broker.fastapi.route.StreamMessage #

StreamMessage(*, body, headers, path)

Bases: Request

A class to represent a stream message.

Initialize a class instance.

Source code in faststream/broker/fastapi/route.py
def __init__(
    self,
    *,
    body: Union["AnyDict", List[Any]],
    headers: "AnyDict",
    path: "AnyDict",
) -> None:
    """Initialize a class instance."""
    self._headers = headers
    self._body = body
    self._query_params = path

    self.scope = {"path_params": self._query_params}
    self._cookies = {}

app property #

app

url property #

url

base_url property #

base_url

headers property #

headers

query_params property #

query_params

path_params property #

path_params

cookies property #

cookies

client property #

client

session property #

session

auth property #

auth

user property #

user

state property #

state

method property #

method

receive property #

receive

scope instance-attribute #

scope = {'path_params': _query_params}

url_for #

url_for(name, /, **path_params)
Source code in starlette/requests.py
def url_for(self, name: str, /, **path_params: typing.Any) -> URL:
    url_path_provider: Router | Starlette | None = self.scope.get("router") or self.scope.get("app")
    if url_path_provider is None:
        raise RuntimeError("The `url_for` method can only be used inside a Starlette application or with a router.")
    url_path = url_path_provider.url_path_for(name, **path_params)
    return url_path.make_absolute_url(base_url=self.base_url)

stream async #

stream()
Source code in starlette/requests.py
async def stream(self) -> typing.AsyncGenerator[bytes, None]:
    if hasattr(self, "_body"):
        yield self._body
        yield b""
        return
    if self._stream_consumed:
        raise RuntimeError("Stream consumed")
    while not self._stream_consumed:
        message = await self._receive()
        if message["type"] == "http.request":
            body = message.get("body", b"")
            if not message.get("more_body", False):
                self._stream_consumed = True
            if body:
                yield body
        elif message["type"] == "http.disconnect":
            self._is_disconnected = True
            raise ClientDisconnect()
    yield b""

body async #

body()
Source code in starlette/requests.py
async def body(self) -> bytes:
    if not hasattr(self, "_body"):
        chunks: list[bytes] = []
        async for chunk in self.stream():
            chunks.append(chunk)
        self._body = b"".join(chunks)
    return self._body

json async #

json()
Source code in starlette/requests.py
async def json(self) -> typing.Any:
    if not hasattr(self, "_json"):  # pragma: no branch
        body = await self.body()
        self._json = json.loads(body)
    return self._json

form #

form(*, max_files=1000, max_fields=1000)
Source code in starlette/requests.py
def form(
    self, *, max_files: int | float = 1000, max_fields: int | float = 1000
) -> AwaitableOrContextManager[FormData]:
    return AwaitableOrContextManagerWrapper(self._get_form(max_files=max_files, max_fields=max_fields))

close async #

close()
Source code in starlette/requests.py
async def close(self) -> None:
    if self._form is not None:  # pragma: no branch
        await self._form.close()

is_disconnected async #

is_disconnected()
Source code in starlette/requests.py
async def is_disconnected(self) -> bool:
    if not self._is_disconnected:
        message: Message = {}

        # If message isn't immediately available, move on
        with anyio.CancelScope() as cs:
            cs.cancel()
            message = await self._receive()

        if message.get("type") == "http.disconnect":
            self._is_disconnected = True

    return self._is_disconnected

send_push_promise async #

send_push_promise(path)
Source code in starlette/requests.py
async def send_push_promise(self, path: str) -> None:
    if "http.response.push" in self.scope.get("extensions", {}):
        raw_headers: list[tuple[bytes, bytes]] = []
        for name in SERVER_PUSH_HEADERS_TO_COPY:
            for value in self.headers.getlist(name):
                raw_headers.append((name.encode("latin-1"), value.encode("latin-1")))
        await self._send({"type": "http.response.push", "path": path, "headers": raw_headers})