Access to Message Information#
As you may know, FastStream serializes a message body and provides you access to it through function arguments. However, there are times when you need to access additional message attributes such as offsets, headers, or other metadata.
Message Access#
You can easily access this information by referring to the message object in the Context!
This object serves as a unified FastStream wrapper around the native broker library message (for example, aiokafka.ConsumerRecord
in the case of Kafka). It contains most of the required information, including:
body: bytes
checksum: int
headers: Sequence[Tuple[str, bytes]]
key: Optional[aiokafka.structs.KT]
offset: int
partition: int
serialized_key_size: int
serialized_value_size: int
timestamp: int
timestamp_type: int
topic: str
value: Optional[aiokafka.structs.VT]
For example, if you would like to access the headers of an incoming message, you would do so like this:
from faststream.kafka import KafkaMessage
@broker.subscriber("test")
async def base_handler(
body: str,
msg: KafkaMessage,
):
print(msg.headers)
Message Fields Access#
In most cases, you don't need all message fields; you need to know just a part of them. You can use Context Fields access feature for this.
For example, you can get access to the headers
like this:
from faststream import Context
@broker.subscriber("test")
async def base_handler(
body: str,
headers: str = Context("message.headers"),
):
print(headers)