Skip to content

Access to Message Information#

As you may know, FastStream serializes a message body and provides you access to it through function arguments. However, there are times when you need to access additional message attributes such as offsets, headers, or other metadata.

Message Access#

You can easily access this information by referring to the message object in the Context!

This object serves as a unified FastStream wrapper around the native broker library message (for example, aiokafka.ConsumerRecord in the case of Kafka). It contains most of the required information, including:

  • body: bytes
  • checksum: int
  • headers: Sequence[Tuple[str, bytes]]
  • key: Optional[aiokafka.structs.KT]
  • offset: int
  • partition: int
  • serialized_key_size: int
  • serialized_value_size: int
  • timestamp: int
  • timestamp_type: int
  • topic: str
  • value: Optional[aiokafka.structs.VT]

For example, if you would like to access the headers of an incoming message, you would do so like this:

from faststream.kafka import KafkaMessage

@broker.subscriber("test")
async def base_handler(
    body: str,
    msg: KafkaMessage,
):
    print(msg.headers)

Message Fields Access#

In most cases, you don't need all message fields; you need to know just a part of them. You can use Context Fields access feature for this.

For example, you can get access to the headers like this:

from faststream import Context

@broker.subscriber("test")
async def base_handler(
    body: str,
    headers: str = Context("message.headers"),
):
    print(headers)

Last update: 2023-09-25