Skip to content

Context Extra Options#

Additionally, Context provides you with some extra capabilities for working with containing objects.

Default Values#

For instance, if you attempt to access a field that doesn't exist in the global context, you will receive a pydantic.ValidationError exception.

However, you can set default values if needed.

1
2
3
4
5
@broker.subscriber("test-topic")
async def handle(
    not_existed: None = Context("not_existed", default=None),
):
    assert not_existed is None
1
2
3
4
5
@broker.subscriber("test-topic")
async def handle(
    not_existed: None = Context("not_existed", default=None),
):
    assert not_existed is None
1
2
3
4
5
@broker.subscriber("test-queue")
async def handle(
    not_existed: None = Context("not_existed", default=None),
):
    assert not_existed is None
1
2
3
4
5
@broker.subscriber("test-subject")
async def handle(
    not_existed: None = Context("not_existed", default=None),
):
    assert not_existed is None
1
2
3
4
5
@broker.subscriber("test-channel")
async def handle(
    not_existed: None = Context("not_existed", default=None),
):
    assert not_existed is None

Cast Context Types#

By default, context fields are NOT CAST to the type specified in their annotation.

from faststream import Context, FastStream, context
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
context.set_global("secret", "1")

@broker.subscriber("test-topic")
async def handle(
    secret: int = Context(),
):
    assert secret == "1"
from faststream import Context, FastStream, context
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
context.set_global("secret", "1")

@broker.subscriber("test-topic")
async def handle(
    secret: int = Context(),
):
    assert secret == "1"
from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
context.set_global("secret", "1")

@broker.subscriber("test-queue")
async def handle(
    secret: int = Context(),
):
    assert secret == "1"
from faststream import Context, FastStream, context
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)
context.set_global("secret", "1")

@broker.subscriber("test-subject")
async def handle(
    secret: int = Context(),
):
    assert secret == "1"
from faststream import Context, FastStream, context
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)
context.set_global("secret", "1")

@broker.subscriber("test-channel")
async def handle(
    secret: int = Context(),
):
    assert secret == "1"

If you require this functionality, you can enable the appropriate flag.

1
2
3
4
5
@broker.subscriber("test-topic2")
async def handle_int(
    secret: int = Context(cast=True),
):
    assert secret == 1
1
2
3
4
5
@broker.subscriber("test-topic2")
async def handle_int(
    secret: int = Context(cast=True),
):
    assert secret == 1
1
2
3
4
5
@broker.subscriber("test-queue2")
async def handle_int(
    secret: int = Context(cast=True),
):
    assert secret == 1
1
2
3
4
5
@broker.subscriber("test-subject2")
async def handle_int(
    secret: int = Context(cast=True),
):
    assert secret == 1
1
2
3
4
5
@broker.subscriber("test-channel2")
async def handle_int(
    secret: int = Context(cast=True),
):
    assert secret == 1

Initial Value#

Also, Context provides you with a initial option to setup base context value without previous set_global call.

1
2
3
4
5
6
@broker.subscriber("test-topic")
async def handle(
    msg: str,
    collector: list[str] = Context(initial=list),
):
    collector.append(msg)
1
2
3
4
5
6
@broker.subscriber("test-topic")
async def handle(
    msg: str,
    collector: list[str] = Context(initial=list),
):
    collector.append(msg)
1
2
3
4
5
6
@broker.subscriber("test-queue")
async def handle(
    msg: str,
    collector: list[str] = Context(initial=list),
):
    collector.append(msg)
1
2
3
4
5
6
@broker.subscriber("test-subject")
async def handle(
    msg: str,
    collector: list[str] = Context(initial=list),
):
    collector.append(msg)
1
2
3
4
5
6
@broker.subscriber("test-channel")
async def handle(
    msg: str,
    collector: list[str] = Context(initial=list),
):
    collector.append(msg)