Skip to content

Context Fields Declaration#

You can also store your own objects in the Context.

Global#

To declare an application-level context field, you need to call the context.set_global method with with a key to indicate where the object will be placed in the context.

from faststream import FastStream, ContextRepo, Context
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@app.on_startup
async def set_global(context: ContextRepo):
    context.set_global("secret_str", "my-perfect-secret")
from faststream import FastStream, ContextRepo, Context
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@app.on_startup
async def set_global(context: ContextRepo):
    context.set_global("secret_str", "my-perfect-secret")
from faststream import FastStream, ContextRepo, Context
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@app.on_startup
async def set_global(context: ContextRepo):
    context.set_global("secret_str", "my-perfect-secret")

Afterward, you can access your secret field in the usual way:

1
2
3
4
5
6
@broker.subscriber("test-topic")
async def handle(
    msg: str,
    secret_str: str=Context(),
):
    assert secret_str == "my-perfect-secret" # pragma: allowlist secret
1
2
3
4
5
6
@broker.subscriber("test-queue")
async def handle(
    msg: str,
    secret_str: str=Context(),
):
    assert secret_str == "my-perfect-secret" # pragma: allowlist secret
1
2
3
4
5
6
@broker.subscriber("test-subject")
async def handle(
    msg: str,
    secret_str: str=Context(),
):
    assert secret_str == "my-perfect-secret" # pragma: allowlist secret

In this case, the field becomes a global context field: it does not depend on the current message handler (unlike message)

To remove a field from the context use the reset_global method:

context.reset_global("my_key")

Local#

To set a local context (available only within the message processing scope), use the context manager scope

from faststream import Context, FastStream, apply_types
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import ContextRepo, KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
async def handle(
    msg: str,
    message: KafkaMessage,
    context: ContextRepo,
):
    with context.scope("correlation_id", message.correlation_id):
        call()


@apply_types
def call(
    message: KafkaMessage,
    correlation_id=Context(),
):
    assert correlation_id == message.correlation_id
from faststream import Context, FastStream, apply_types
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import ContextRepo, RabbitMessage

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test-queue")
async def handle(
    msg: str,
    message: RabbitMessage,
    context: ContextRepo,
):
    with context.scope("correlation_id", message.correlation_id):
        call()


@apply_types
def call(
    message: RabbitMessage,
    correlation_id=Context(),
):
    assert correlation_id == message.correlation_id
from faststream import Context, FastStream, apply_types
from faststream.nats import NatsBroker
from faststream.nats.annotations import ContextRepo, NatsMessage

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test-subject")
async def handle(
    msg: str,
    message: NatsMessage,
    context: ContextRepo,
):
    with context.scope("correlation_id", message.correlation_id):
        call()


@apply_types
def call(
    message: NatsMessage,
    correlation_id=Context(),
):
    assert correlation_id == message.correlation_id

You can also set the context yourself, and it will remain within the current call stack until you clear it.

from faststream import Context, FastStream, apply_types, context
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import KafkaMessage

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
async def handle(
    msg: str,
    message: KafkaMessage,
):
    tag = context.set_local("correlation_id", message.correlation_id)
    call(tag)


@apply_types
def call(
    tag,
    message: KafkaMessage,
    correlation_id=Context(),
):
    assert correlation_id == message.correlation_id
    context.reset_local("correlation_id", tag)
from faststream import Context, FastStream, apply_types, context
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import RabbitMessage

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test-queue")
async def handle(
    msg: str,
    message: RabbitMessage,
):
    tag = context.set_local("correlation_id", message.correlation_id)
    call(tag)


@apply_types
def call(
    tag,
    message: RabbitMessage,
    correlation_id=Context(),
):
    assert correlation_id == message.correlation_id
    context.reset_local("correlation_id", tag)
from faststream import Context, FastStream, apply_types, context
from faststream.nats import NatsBroker
from faststream.nats.annotations import NatsMessage

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test-subject")
async def handle(
    msg: str,
    message: NatsMessage,
):
    tag = context.set_local("correlation_id", message.correlation_id)
    call(tag)


@apply_types
def call(
    tag,
    message: NatsMessage,
    correlation_id=Context(),
):
    assert correlation_id == message.correlation_id
    context.reset_local("correlation_id", tag)

Last update: 2023-09-21