Skip to content

Access to Message Information#

As you know, FastStream serializes a message body and provides you access to it through function arguments. But sometimes you need to access message_id, headers, or other meta-information.

Message Access#

You can get it in a simple way: just acces the message object in the Context.

It contains the required information such as:

  • body: bytes
  • decoded_body: Any
  • content_type: str
  • reply_to: str
  • headers: dict[str, Any]
  • path: dict[str, Any]
  • message_id: str
  • correlation_id: str

It is a FastStream wrapper around a native broker library message (nats.aio.msg.Msg in the NATS' case) that you can access with raw_message.

from faststream.nats import NatsMessage

@broker.subscriber("test")
async def base_handler(
    body: str,
    msg: NatsMessage,
):
    print(msg.correlation_id)

Also, if you can't find the information you require, you can get access directly to the wrapped nats.aio.msg.Msg, which contains complete message information.

from nats.aio.msg import Msg
from faststream.nats import NatsMessage

@broker.subscriber("test")
async def base_handler(body: str, msg: NatsMessage):
    raw: Msg = msg.raw_message
    print(raw)

Message Fields Access#

But in most cases, you don't need all message fields; you need to access some of them. You can use Context Fields access feature for this reason.

For example, you can access the correlation_id like this:

from faststream import Context

@broker.subscriber("test")
async def base_handler(
    body: str,
    cor_id: str = Context("message.correlation_id"),
):
    print(cor_id)

Or even directly from the raw message:

from faststream import Context

@broker.subscriber("test")
async def base_handler(
    body: str,
    cor_id: str = Context("message.raw_message.correlation_id"),
):
    print(cor_id)

But this code is too long to reuse everywhere. In this case, you can use a Python Annotated feature:

from types import Annotated
from faststream import Context

CorrelationId = Annotated[str, Context("message.correlation_id")]

@broker.subscriber("test")
async def base_handler(
    body: str,
    cor_id: CorrelationId,
):
    print(cor_id)
from typing_extensions import Annotated
from faststream import Context

CorrelationId = Annotated[str, Context("message.correlation_id")]

@broker.subscriber("test")
async def base_handler(
    body: str,
    cor_id: CorrelationId,
):
    print(cor_id)

Headers Access#

Sure, you can get access to a raw message and get the headers dict itself, but more often you just need a single header field. So, you can easily access it using the Context:

from faststream import Context

@broker.subscriber("test")
async def base_handler(
    body: str,
    user: str = Context("message.headers.user"),
):
    ...

Using the special Header class is more convenient, as it also validates the header value using Pydantic. It works the same way as Context, but it is just a shorcut to Context with a default setup. So, you already know how to use it:

from faststream import Header

@broker.subscriber("test")
async def base_handler(
    body: str,
    user: str = Header(),
):
    ...

Subject Pattern Access#

As you know, NATS allows you to use a pattern like this logs.* to subscriber to subjects. Getting access to the real * value is an often-used scenario, and FastStream provide it to you with the Path object (which is a shortcut to Context("message.path.*")).

To use it, you just need to replace your * with {variable-name} and use Path as a regular Context object:

from faststream import Path

@broker.subscriber("logs.{level}")
async def base_handler(
    body: str,
    level: str = Path(),
):
    ...

Last update: 2023-10-15