Skip to content

RabbitResponse

faststream.rabbit.RabbitResponse #

RabbitResponse(body, *, headers=None, correlation_id=None, message_id=None, mandatory=True, immediate=False, timeout=None, persist=None, priority=None, message_type=None, content_type=None, expiration=None, content_encoding=None)

Bases: Response

Source code in faststream/rabbit/response.py
def __init__(
    self,
    body: "AioPikaSendableMessage",
    *,
    headers: Optional["AnyDict"] = None,
    correlation_id: Optional[str] = None,
    message_id: Optional[str] = None,
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    persist: Optional[bool] = None,
    priority: Optional[int] = None,
    message_type: Optional[str] = None,
    content_type: Optional[str] = None,
    expiration: Optional["DateType"] = None,
    content_encoding: Optional[str] = None,
) -> None:
    super().__init__(
        body=body,
        headers=headers,
        correlation_id=correlation_id,
    )

    self.message_id = message_id
    self.mandatory = mandatory
    self.immediate = immediate
    self.timeout = timeout
    self.persist = persist
    self.priority = priority
    self.message_type = message_type
    self.content_type = content_type
    self.expiration = expiration
    self.content_encoding = content_encoding

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

message_id instance-attribute #

message_id = message_id

mandatory instance-attribute #

mandatory = mandatory

immediate instance-attribute #

immediate = immediate

timeout instance-attribute #

timeout = timeout

persist instance-attribute #

persist = persist

priority instance-attribute #

priority = priority

message_type instance-attribute #

message_type = message_type

content_type instance-attribute #

content_type = content_type

expiration instance-attribute #

expiration = expiration

content_encoding instance-attribute #

content_encoding = content_encoding

add_headers #

add_headers(extra_headers, *, override=True)
Source code in faststream/broker/response.py
def add_headers(
    self,
    extra_headers: "AnyDict",
    *,
    override: bool = True,
) -> None:
    if override:
        self.headers = {**self.headers, **extra_headers}
    else:
        self.headers = {**extra_headers, **self.headers}

as_publish_kwargs #

as_publish_kwargs()
Source code in faststream/rabbit/response.py
@override
def as_publish_kwargs(self) -> "AnyDict":
    publish_options = {
        **super().as_publish_kwargs(),
        "message_id": self.message_id,
        "mandatory": self.mandatory,
        "immediate": self.immediate,
        "timeout": self.timeout,
        "persist": self.persist,
        "priority": self.priority,
        "message_type": self.message_type,
        "content_type": self.content_type,
        "expiration": self.expiration,
        "content_encoding": self.content_encoding,
    }
    return publish_options