Skip to content

Existing Fields#

Context already contains some global objects that you can always access:

  • broker - the current broker
  • context - the context itself, in which you can write your own fields
  • logger - the logger used for your broker (tags messages with message_id)
  • message - the raw message (if you need access to it)

At the same time, thanks to contextlib.ContextVar, message is local for you current consumer scope.

Access to Context Fields#

By default, the context searches for an object based on the argument name.

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker

broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)

@broker_object.subscriber("test-topic")
async def handle(
    msg: str,
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream import Context, FastStream
from faststream.confluent import KafkaBroker

broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)

@broker_object.subscriber("test-topic")
async def handle(
    msg: str,
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker

broker_object = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker_object)

@broker_object.subscriber("test-queue")
async def handle(
    msg: str,
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream import Context, FastStream
from faststream.nats import NatsBroker

broker_object = NatsBroker("nats://localhost:4222")
app = FastStream(broker_object)

@broker_object.subscriber("test-subject")
async def handle(
    msg: str,
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream import Context, FastStream
from faststream.redis import RedisBroker

broker_object = RedisBroker("redis://localhost:6379")
app = FastStream(broker_object)

@broker_object.subscriber("test-channel")
async def handle(
    msg: str,
    logger=Context(),
    message=Context(),
    broker=Context(),
    context=Context(),
):
    logger.info(message)
    await broker.publish("test", "response")

Annotated Aliases#

Also, FastStream has already created Annotated aliases to provide you with comfortable access to existing objects. You can import them directly from faststream or your broker-specific modules:

  • Shared aliases
from faststream import Logger, ContextRepo
from faststream.kafka.annotations import (
    Logger, ContextRepo, KafkaMessage,
    KafkaBroker, KafkaProducer, NoCast,
)

faststream.kafka.KafkaMessage is an alias to faststream.kafka.annotations.KafkaMessage

from faststream.kafka import KafkaMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
    ContextRepo,
    KafkaMessage,
    Logger,
    KafkaBroker as BrokerAnnotation,
)

broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)

@broker_object.subscriber("response-topic")
async def handle_response(
    msg: str,
    logger: Logger,
    message: KafkaMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream.confluent.annotations import (
    Logger, ContextRepo, KafkaMessage,
    KafkaBroker, KafkaProducer, NoCast,
)

faststream.confluent.KafkaMessage is an alias to faststream.confluent.annotations.KafkaMessage

from faststream.confluent import KafkaMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.annotations import (
    ContextRepo,
    KafkaMessage,
    Logger,
    KafkaBroker as BrokerAnnotation,
)

broker_object = KafkaBroker("localhost:9092")
app = FastStream(broker_object)

@broker_object.subscriber("response-topic")
async def handle_response(
    msg: str,
    logger: Logger,
    message: KafkaMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream.rabbit.annotations import (
    Logger, ContextRepo, RabbitMessage,
    RabbitBroker, RabbitProducer, NoCast,
)

faststream.rabbit.RabbitMessage is an alias to faststream.rabbit.annotations.RabbitMessage

from faststream.rabbit import RabbitMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import (
    ContextRepo,
    RabbitMessage,
    Logger,
    RabbitBroker as BrokerAnnotation,
)

broker_object = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker_object)

@broker_object.subscriber("response-queue")
async def handle_response(
    msg: str,
    logger: Logger,
    message: RabbitMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream.nats.annotations import (
    Logger, ContextRepo, NatsMessage,
    NatsBroker, NatsProducer, NatsJsProducer,
    Client, JsClient, NoCast,
)

faststream.nats.NatsMessage is an alias to faststream.nats.annotations.NatsMessage

from faststream.nats import NatsMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.nats import NatsBroker
from faststream.nats.annotations import (
    ContextRepo,
    NatsMessage,
    Logger,
    NatsBroker as BrokerAnnotation,
)

broker_object = NatsBroker("nats://localhost:4222")
app = FastStream(broker_object)

@broker_object.subscriber("response-subject")
async def handle_response(
    msg: str,
    logger: Logger,
    message: NatsMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")
from faststream.redis.annotations import (
    Logger, ContextRepo, RedisMessage,
    RedisBroker, Redis, NoCast,
)

faststream.redis.RedisMessage is an alias to faststream.redis.annotations.RedisMessage

from faststream.redis import RedisMessage

To use them, simply import and use them as subscriber argument annotations.

from faststream import Context, FastStream
from faststream.redis import RedisBroker
from faststream.redis.annotations import (
    ContextRepo,
    RedisMessage,
    Logger,
    RedisBroker as BrokerAnnotation,
)

broker_object = RedisBroker("redis://localhost:6379")
app = FastStream(broker_object)

@broker_object.subscriber("response-channel")
async def handle_response(
    msg: str,
    logger: Logger,
    message: RedisMessage,
    context: ContextRepo,
    broker: BrokerAnnotation,
):
    logger.info(message)
    await broker.publish("test", "response")