KafkaResponse faststream.confluent.response.KafkaResponse # KafkaResponse(body, *, headers=None, correlation_id=None, timestamp_ms=None, key=None) Bases: Response Source code in faststream/confluent/response.py 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28def __init__( self, body: "SendableMessage", *, headers: Optional["AnyDict"] = None, correlation_id: Optional[str] = None, timestamp_ms: Optional[int] = None, key: Optional[bytes] = None, ) -> None: super().__init__( body=body, headers=headers, correlation_id=correlation_id, ) self.timestamp_ms = timestamp_ms self.key = key body instance-attribute # body = body headers instance-attribute # headers = headers or {} correlation_id instance-attribute # correlation_id = correlation_id timestamp_ms instance-attribute # timestamp_ms = timestamp_ms key instance-attribute # key = key add_headers # add_headers(extra_headers, *, override=True) Source code in faststream/broker/response.py 20 21 22 23 24 25 26 27 28 29def add_headers( self, extra_headers: "AnyDict", *, override: bool = True, ) -> None: if override: self.headers = {**self.headers, **extra_headers} else: self.headers = {**extra_headers, **self.headers} as_publish_kwargs # as_publish_kwargs() Source code in faststream/confluent/response.py 30 31 32 33 34 35 36 37@override def as_publish_kwargs(self) -> "AnyDict": publish_options = { **super().as_publish_kwargs(), "timestamp_ms": self.timestamp_ms, "key": self.key, } return publish_options