Skip to content

Application Context#

FastStreams has its own Dependency Injection container - Context, used to store application runtime objects and variables.

With this container, you can access both application scope and message processing scope objects. This functionality is similar to Depends usage.

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message=Context(),  # get access to raw message
):
    ...
from faststream import Context, FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message=Context(),  # get access to raw message
):
    ...
from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message=Context(),  # get access to raw message
):
    ...
from faststream import Context, FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message=Context(),  # get access to raw message
):
    ...
from faststream import Context, FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message=Context(),  # get access to raw message
):
    ...

But, with the Annotated Python feature usage, it is much closer to @pytest.fixture.

from typing import Annotated

from faststream import Context, FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.message import KafkaMessage

Message = Annotated[KafkaMessage, Context()]

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...
from typing import Annotated

from faststream import Context, FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.message import KafkaMessage

Message = Annotated[KafkaMessage, Context()]

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...
from typing import Annotated

from faststream import Context, FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.message import RabbitMessage

Message = Annotated[RabbitMessage, Context()]

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...
from typing import Annotated

from faststream import Context, FastStream
from faststream.nats import NatsBroker
from faststream.nats.message import NatsMessage

Message = Annotated[NatsMessage, Context()]

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...
from typing import Annotated

from faststream import Context, FastStream
from faststream.redis import RedisBroker
from faststream.redis.message import RedisMessage

Message = Annotated[RedisMessage, Context()]

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(
    body: str,
    message: Message,  # get access to raw message
):
    ...

Usages#

By default, the context is available in the same place as Depends:

  • at lifespan hooks
  • message subscribers
  • nested dependencies

Tip

Fields obtained from the Context are editable, so editing them in a function means editing them everywhere.

Compatibility with Regular Functions#

To use context in other functions, use the @apply_types decorator. In this case, the context of the called function will correspond to the context of the event handler from which it was called.

1
2
3
4
5
6
7
8
9
from faststream import Context, apply_types

@broker.subscriber("test")
async def handler(body):
    nested_func(body)

@apply_types
def nested_func(body, logger=Context()):
    logger.info(body)

In the example above, we did not pass the logger function at calling it; it was placed from context.