Context Fields Declaration You can also store your own objects in the Context
.
Global To declare an application-level context field, you need to call the context.set_global
method with with a key to indicate where the object will be placed in the context.
AIOKafka Confluent RabbitMQ NATS Redis
from faststream import FastStream , ContextRepo , Context
from faststream.kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@app . on_startup
async def set_global ( context : ContextRepo ):
context . set_global ( "secret_str" , "my-perfect-secret" )
from faststream import FastStream , ContextRepo , Context
from faststream.confluent import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@app . on_startup
async def set_global ( context : ContextRepo ):
context . set_global ( "secret_str" , "my-perfect-secret" )
from faststream import FastStream , ContextRepo , Context
from faststream.rabbit import RabbitBroker
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
@app . on_startup
async def set_global ( context : ContextRepo ):
context . set_global ( "secret_str" , "my-perfect-secret" )
from faststream import FastStream , ContextRepo , Context
from faststream.nats import NatsBroker
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
@app . on_startup
async def set_global ( context : ContextRepo ):
context . set_global ( "secret_str" , "my-perfect-secret" )
from faststream import FastStream , ContextRepo , Context
from faststream.redis import RedisBroker
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
@app . on_startup
async def set_global ( context : ContextRepo ):
context . set_global ( "secret_str" , "my-perfect-secret" )
Afterward, you can access your secret
field in the usual way:
AIOKafka Confluent RabbitMQ NATS Redis
@broker . subscriber ( "test-topic" )
async def handle (
msg : str ,
secret_str : str = Context (),
):
assert secret_str == "my-perfect-secret"
@broker . subscriber ( "test-topic" )
async def handle (
msg : str ,
secret_str : str = Context (),
):
assert secret_str == "my-perfect-secret"
@broker . subscriber ( "test-queue" )
async def handle (
msg : str ,
secret_str : str = Context (),
):
assert secret_str == "my-perfect-secret"
@broker . subscriber ( "test-subject" )
async def handle (
msg : str ,
secret_str : str = Context (),
):
assert secret_str == "my-perfect-secret"
@broker . subscriber ( "test-channel" )
async def handle (
msg : str ,
secret_str : str = Context (),
):
assert secret_str == "my-perfect-secret"
In this case, the field becomes a global context field: it does not depend on the current message handler (unlike message
)
To remove a field from the context use the reset_global
method:
context . reset_global ( "my_key" )
Local To set a local context (available only within the message processing scope), use the context manager scope
AIOKafka Confluent RabbitMQ NATS Redis
from faststream import Context , FastStream , apply_types
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import ContextRepo , KafkaMessage
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle (
msg : str ,
message : KafkaMessage ,
context : ContextRepo ,
):
with context . scope ( "correlation_id" , message . correlation_id ):
call ()
@apply_types
def call (
message : KafkaMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
from faststream import Context , FastStream , apply_types
from faststream.confluent import KafkaBroker
from faststream.confluent.annotations import ContextRepo , KafkaMessage
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle (
msg : str ,
message : KafkaMessage ,
context : ContextRepo ,
):
with context . scope ( "correlation_id" , message . correlation_id ):
call ()
@apply_types
def call (
message : KafkaMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
from faststream import Context , FastStream , apply_types
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import ContextRepo , RabbitMessage
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
@broker . subscriber ( "test-queue" )
async def handle (
msg : str ,
message : RabbitMessage ,
context : ContextRepo ,
):
with context . scope ( "correlation_id" , message . correlation_id ):
call ()
@apply_types
def call (
message : RabbitMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
from faststream import Context , FastStream , apply_types
from faststream.nats import NatsBroker
from faststream.nats.annotations import ContextRepo , NatsMessage
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
@broker . subscriber ( "test-subject" )
async def handle (
msg : str ,
message : NatsMessage ,
context : ContextRepo ,
):
with context . scope ( "correlation_id" , message . correlation_id ):
call ()
@apply_types
def call (
message : NatsMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
from faststream import Context , FastStream , apply_types
from faststream.redis import RedisBroker
from faststream.redis.annotations import ContextRepo , RedisMessage
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
@broker . subscriber ( "test-channel" )
async def handle (
msg : str ,
message : RedisMessage ,
context : ContextRepo ,
):
with context . scope ( "correlation_id" , message . correlation_id ):
call ()
@apply_types
def call (
message : RedisMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
You can also set the context by yourself, and it will remain within the current call stack until you clear it.
AIOKafka Confluent RabbitMQ NATS Redis
from faststream import Context , FastStream , apply_types , context
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import KafkaMessage
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle (
msg : str ,
message : KafkaMessage ,
):
tag = context . set_local ( "correlation_id" , message . correlation_id )
call ( tag )
@apply_types
def call (
tag ,
message : KafkaMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
context . reset_local ( "correlation_id" , tag )
from faststream import Context , FastStream , apply_types , context
from faststream.confluent import KafkaBroker
from faststream.confluent.annotations import KafkaMessage
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle (
msg : str ,
message : KafkaMessage ,
):
tag = context . set_local ( "correlation_id" , message . correlation_id )
call ( tag )
@apply_types
def call (
tag ,
message : KafkaMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
context . reset_local ( "correlation_id" , tag )
from faststream import Context , FastStream , apply_types , context
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import RabbitMessage
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
@broker . subscriber ( "test-queue" )
async def handle (
msg : str ,
message : RabbitMessage ,
):
tag = context . set_local ( "correlation_id" , message . correlation_id )
call ( tag )
@apply_types
def call (
tag ,
message : RabbitMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
context . reset_local ( "correlation_id" , tag )
from faststream import Context , FastStream , apply_types , context
from faststream.nats import NatsBroker
from faststream.nats.annotations import NatsMessage
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
@broker . subscriber ( "test-subject" )
async def handle (
msg : str ,
message : NatsMessage ,
):
tag = context . set_local ( "correlation_id" , message . correlation_id )
call ( tag )
@apply_types
def call (
tag ,
message : NatsMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
context . reset_local ( "correlation_id" , tag )
from faststream import Context , FastStream , apply_types , context
from faststream.redis import RedisBroker
from faststream.redis.annotations import RedisMessage
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
@broker . subscriber ( "test-channel" )
async def handle (
msg : str ,
message : RedisMessage ,
):
tag = context . set_local ( "correlation_id" , message . correlation_id )
call ( tag )
@apply_types
def call (
tag ,
message : RedisMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
context . reset_local ( "correlation_id" , tag )