Access to Message Information#
As you know, FastStream serializes a message body and provides you access to it through function arguments. But sometimes you want access to a message_id, headers, or other meta-information.
Message Access#
You can get it in a simple way: just acces to the message object in the Context!
It is a FastStream wrapper around a native broker library message (aio_pika.IncomingMessage
in the RabbitMQ case). It contains the required information such as:
body: bytes
decoded_body: Any
content_type: str
reply_to: str
headers: dict[str, Any]
message_id: str
correlation_id: str
from faststream.rabbit import RabbitMessage
@broker.subscriber("test")
async def base_handler(
body: str,
msg: RabbitMessage,
):
print(msg.correlation_id)
Also, if you can't find the information you reqiure, you can get access directly to the wrapped aio_pika.IncomingMessage
, which contains complete message information.
from aio_pika import IncomingMessage
from faststream.rabbit import RabbitMessage
@broker.subscriber("test")
async def base_handler(body: str, msg: RabbitMessage):
raw: IncomingMessage = msg.raw_message
print(raw)
Message Fields Access#
But in the most cases, you don't need all message fields; you need to access some of them. You can use Context Fields access feature for this reason.
For example, you can get access to the correlation_id
like this:
from faststream import Context
@broker.subscriber("test")
async def base_handler(
body: str,
cor_id: str = Context("message.correlation_id"),
):
print(cor_id)
Or even directly from the raw message:
from faststream import Context
@broker.subscriber("test")
async def base_handler(
body: str,
cor_id: str = Context("message.raw_message.correlation_id"),
):
print(cor_id)
But this code is too long to be reused everywhere. In this case, you can use a Python Annotated
feature: