Skip to content

Application Context#

FastStreams has its own Dependency Injection container - Context, used to store application runtime objects and variables.

With this container, you can access both application scope and message processing scope objects. This functionality is similar to Depends usage.

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.message import KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: KafkaMessage = Context(),  # get access to raw message
):
    ...
from faststream import Context, FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.message import KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: KafkaMessage = Context(),  # get access to raw message
):
    ...
from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.message import RabbitMessage

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: RabbitMessage = Context(),  # get access to raw message
):
    ...
from faststream import Context, FastStream
from faststream.nats import NatsBroker
from faststream.nats.message import NatsMessage

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: NatsMessage = Context(),  # get access to raw message
):
    ...
from faststream import Context, FastStream
from faststream.redis import RedisBroker
from faststream.redis.message import RedisMessage

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: RedisMessage = Context(),  # get access to raw message
):
    ...

But, with the Annotated Python feature usage, it is much closer to @pytest.fixture.

from typing import Annotated

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.message import KafkaMessage

Message = Annotated[KafkaMessage, Context()]

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...
from typing import Annotated

from faststream import Context, FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.message import KafkaMessage

Message = Annotated[KafkaMessage, Context()]

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...
from typing import Annotated

from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.message import RabbitMessage

Message = Annotated[RabbitMessage, Context()]

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...
from typing import Annotated

from faststream import Context, FastStream
from faststream.nats import NatsBroker
from faststream.nats.message import NatsMessage

Message = Annotated[NatsMessage, Context()]

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...
from typing import Annotated

from faststream import Context, FastStream
from faststream.redis import RedisBroker
from faststream.redis.message import RedisMessage

Message = Annotated[RedisMessage, Context()]

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...

Usages#

By default, the context is available in the same place as Depends:

  • at lifespan hooks
  • message subscribers
  • nested dependencies

Tip

Fields obtained from the Context are editable, so editing them in a function means editing them everywhere.

Compatibility with Regular Functions#

To use context in other functions, use the @apply_types decorator. In this case, the context of the called function will correspond to the context of the event handler from which it was called.

from logging import Logger

from faststream import Context, apply_types
from faststream.rabbit import RabbitBroker

@broker.subscriber("test")
async def handler(body):
    nested_func(body)

@apply_types
def nested_func(body, logger: Logger = Context()):
    logger.info(body)

In the example above, we did not pass the logger argument explicitly; it was used from the existing Context.