Additionally, Context
provides you with some extra capabilities for working with containing objects.
Default Values
For instance, if you attempt to access a field that doesn't exist in the global context, you will receive a pydantic.ValidationError
exception.
However, you can set default values if needed.
| @broker.subscriber("test-topic")
async def handle(
not_existed: None = Context("not_existed", default=None),
):
assert not_existed is None
|
| @broker.subscriber("test-topic")
async def handle(
not_existed: None = Context("not_existed", default=None),
):
assert not_existed is None
|
| @broker.subscriber("test-queue")
async def handle(
not_existed: None = Context("not_existed", default=None),
):
assert not_existed is None
|
| @broker.subscriber("test-subject")
async def handle(
not_existed: None = Context("not_existed", default=None),
):
assert not_existed is None
|
| @broker.subscriber("test-channel")
async def handle(
not_existed: None = Context("not_existed", default=None),
):
assert not_existed is None
|
Cast Context Types
By default, context fields are NOT CAST to the type specified in their annotation.
| from faststream import Context, FastStream, context
from faststream.kafka import KafkaBroker
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
context.set_global("secret", "1")
@broker.subscriber("test-topic")
async def handle(
secret: int = Context(),
):
assert secret == "1"
|
| from faststream import Context, FastStream, context
from faststream.confluent import KafkaBroker
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
context.set_global("secret", "1")
@broker.subscriber("test-topic")
async def handle(
secret: int = Context(),
):
assert secret == "1"
|
| from faststream import Context, FastStream, context
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
context.set_global("secret", "1")
@broker.subscriber("test-queue")
async def handle(
secret: int = Context(),
):
assert secret == "1"
|
| from faststream import Context, FastStream, context
from faststream.nats import NatsBroker
broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)
context.set_global("secret", "1")
@broker.subscriber("test-subject")
async def handle(
secret: int = Context(),
):
assert secret == "1"
|
| from faststream import Context, FastStream, context
from faststream.redis import RedisBroker
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)
context.set_global("secret", "1")
@broker.subscriber("test-channel")
async def handle(
secret: int = Context(),
):
assert secret == "1"
|
If you require this functionality, you can enable the appropriate flag.
Initial Value
Also, Context
provides you with a initial
option to setup base context value without previous set_global
call.
| @broker.subscriber("test-topic")
async def handle(
msg: str,
collector: list[str] = Context(initial=list),
):
collector.append(msg)
|
| @broker.subscriber("test-topic")
async def handle(
msg: str,
collector: list[str] = Context(initial=list),
):
collector.append(msg)
|
| @broker.subscriber("test-queue")
async def handle(
msg: str,
collector: list[str] = Context(initial=list),
):
collector.append(msg)
|
| @broker.subscriber("test-subject")
async def handle(
msg: str,
collector: list[str] = Context(initial=list),
):
collector.append(msg)
|
| @broker.subscriber("test-channel")
async def handle(
msg: str,
collector: list[str] = Context(initial=list),
):
collector.append(msg)
|