Skip to content

Access by Name#

Sometimes, you may need to use a different name for the argument (not the one under which it is stored in the context) or get access to specific parts of the object. To do this, simply specify the name of what you want to access, and the context will provide you with the object.

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.message import KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
async def handle(
    msg: KafkaMessage = Context("message"),
    correlation_id: str = Context("message.correlation_id"),
    user_header: str = Context("message.headers.user"),
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header

This way you can get access to context object by its name

msg: KafkaMessage = Context("message"),
from faststream import Context, FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.message import KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
async def handle(
    msg: KafkaMessage = Context("message"),
    correlation_id: str = Context("message.correlation_id"),
    user_header: str = Context("message.headers.user"),
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header

This way you can get access to context object by its name

msg: KafkaMessage = Context("message"),
from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.message import RabbitMessage

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test-queue")
async def handle(
    msg: RabbitMessage = Context("message"),
    correlation_id: str = Context("message.correlation_id"),
    user_header: str = Context("message.headers.user"),
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header

This way you can get access to context object by its name

msg: RabbitMessage = Context("message"),
from faststream import Context, FastStream
from faststream.nats import NatsBroker
from faststream.nats.message import NatsMessage

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test-subject")
async def handle(
    msg: NatsMessage = Context("message"),
    correlation_id: str = Context("message.correlation_id"),
    user_header: str = Context("message.headers.user"),
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header

This way you can get access to context object by its name

msg: NatsMessage = Context("message"),
from faststream import Context, FastStream
from faststream.redis import RedisBroker
from faststream.redis.message import RedisMessage

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("test-channel")
async def handle(
    msg: RedisMessage = Context("message"),
    correlation_id: str = Context("message.correlation_id"),
    user_header: str = Context("message.headers.user"),
):
    assert msg.correlation_id == correlation_id
    assert msg.headers["user"] == user_header

This way you can get access to context object by its name

msg: RedisMessage = Context("message"),

This way you can get access to context object specific field

correlation_id: str = Context("message.correlation_id"),

Or even to a dict key

user_header: str = Context("message.headers.user"),