Skip to content

Existing Fields#

Context already contains some global objects that you can always access:

  • broker - the current broker
  • context - the context itself, in which you can write your own fields
  • logger - the logger used for your broker (tags messages with message_id)
  • message - the raw message (if you need access to it)

At the same time, thanks to contextlib.ContextVar, message is local for you current consumer scope.

Access to Context Fields#

By default, the context searches for an object based on the argument name.

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker


broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)


@broker_object.subscriber("test-topic")
async def handle(
    msg: str,
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker


broker_object = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker_object)


@broker_object.subscriber("test-queue")
async def handle(
    msg: str,
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")

Annotated Aliases#

Also, FastStream has already created Annotated aliases to provide you with comfortable access to existing objects. You can import them directly from faststream or your broker-specific modules:

  • Shared aliases
from faststream import Logger, ContextRepo
  • Kafka aliases
from faststream.kafka.annotations import (
    Logger, ContextRepo, KafkaMessage, KafkaBroker, KafkaProducer
)
  • RabbitMQ aliases
from faststream.rabbit.annotations import (
    Logger, ContextRepo, RabbitMessage, RabbitBroker, RabbitProducer
)

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
    ContextRepo,
    KafkaMessage,
    Logger,
    KafkaBroker as BrokerAnnotation,
)

broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)


@broker_object.subscriber("response-topic")
async def handle_response(
    msg: str,
    logger: Logger,
    message: KafkaMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import (
    ContextRepo,
    RabbitMessage,
    Logger,
    RabbitBroker as BrokerAnnotation,
)

broker_object = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker_object)


@broker_object.subscriber("response-queue")
async def handle_response(
    msg: str,
    logger: Logger,
    message: RabbitMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")

Last update: 2023-09-21