Skip to content

NatsResponse

faststream.nats.NatsResponse #

NatsResponse(body, *, headers=None, correlation_id=None, stream=None)

Bases: Response

Source code in faststream/nats/response.py
def __init__(
    self,
    body: "SendableMessage",
    *,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    stream: Optional[str] = None,
) -> None:
    super().__init__(
        body=body,
        headers=headers,
        correlation_id=correlation_id,
    )
    self.stream = stream

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

stream instance-attribute #

stream = stream

add_headers #

add_headers(extra_headers, *, override=True)
Source code in faststream/broker/response.py
def add_headers(
    self,
    extra_headers: "AnyDict",
    *,
    override: bool = True,
) -> None:
    if override:
        self.headers = {**self.headers, **extra_headers}
    else:
        self.headers = {**extra_headers, **self.headers}

as_publish_kwargs #

as_publish_kwargs()
Source code in faststream/nats/response.py
@override
def as_publish_kwargs(self) -> "AnyDict":
    publish_options = {
        **super().as_publish_kwargs(),
        "stream": self.stream,
    }
    return publish_options