Access to Message Information#
As you know, FastStream serializes a message body and provides you access to it through function arguments. But sometimes you need to access message_id, headers, or other meta-information.
Message Access#
You can get it in a simple way: just acces the message object in the Context.
It contains the required information such as:
body: bytes
decoded_body: Any
content_type: str
reply_to: str
headers: dict[str, Any]
path: dict[str, Any]
message_id: str
correlation_id: str
It is a FastStream wrapper around a native broker library message (nats.aio.msg.Msg
in the NATS' case) that you can access with raw_message
.
from faststream.nats import NatsMessage
@broker.subscriber("test")
async def base_handler(
body: str,
msg: NatsMessage,
):
print(msg.correlation_id)
Also, if you can't find the information you require, you can get access directly to the wrapped nats.aio.msg.Msg
, which contains complete message information.
from nats.aio.msg import Msg
from faststream.nats import NatsMessage
@broker.subscriber("test")
async def base_handler(body: str, msg: NatsMessage):
raw: Msg = msg.raw_message
print(raw)
Message Fields Access#
But in most cases, you don't need all message fields; you need to access some of them. You can use Context Fields access feature for this reason.
For example, you can access the correlation_id
like this:
from faststream import Context
@broker.subscriber("test")
async def base_handler(
body: str,
cor_id: str = Context("message.correlation_id"),
):
print(cor_id)
Or even directly from the raw message:
from faststream import Context
@broker.subscriber("test")
async def base_handler(
body: str,
cor_id: str = Context("message.raw_message.correlation_id"),
):
print(cor_id)
But this code is too long to reuse everywhere. In this case, you can use a Python Annotated
feature:
Headers Access#
Sure, you can get access to a raw message and get the headers dict itself, but more often you just need a single header field. So, you can easily access it using the Context
:
from faststream import Context
@broker.subscriber("test")
async def base_handler(
body: str,
user: str = Context("message.headers.user"),
):
...
Using the special Header
class is more convenient, as it also validates the header value using Pydantic. It works the same way as Context
, but it is just a shorcut to Context
with a default setup. So, you already know how to use it:
from faststream import Header
@broker.subscriber("test")
async def base_handler(
body: str,
user: str = Header(),
):
...
Subject Pattern Access#
As you know, NATS allows you to use a pattern like this logs.*
to subscriber to subjects. Getting access to the real *
value is an often-used scenario, and FastStream provide it to you with the Path
object (which is a shortcut to Context("message.path.*")
).
To use it, you just need to replace your *
with {variable-name}
and use Path
as a regular Context
object:
from faststream import Path
@broker.subscriber("logs.{level}")
async def base_handler(
body: str,
level: str = Path(),
):
...