Context Fields Declaration You can also store your own objects in the Context
.
Global To declare an application-level context field, you need to call the context.set_global
method with with a key to indicate where the object will be placed in the context.
Kafka RabbitMQ NATS Redis
from faststream import FastStream , ContextRepo , Context
from faststream.kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@app . on_startup
async def set_global ( context : ContextRepo ):
context . set_global ( "secret_str" , "my-perfect-secret" )
from faststream import FastStream , ContextRepo , Context
from faststream.rabbit import RabbitBroker
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
@app . on_startup
async def set_global ( context : ContextRepo ):
context . set_global ( "secret_str" , "my-perfect-secret" )
from faststream import FastStream , ContextRepo , Context
from faststream.nats import NatsBroker
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
@app . on_startup
async def set_global ( context : ContextRepo ):
context . set_global ( "secret_str" , "my-perfect-secret" )
from faststream import FastStream , ContextRepo , Context
from faststream.redis import RedisBroker
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
@app . on_startup
async def set_global ( context : ContextRepo ):
context . set_global ( "secret_str" , "my-perfect-secret" )
Afterward, you can access your secret
field in the usual way:
In this case, the field becomes a global context field: it does not depend on the current message handler (unlike message
)
To remove a field from the context use the reset_global
method:
context . reset_global ( "my_key" )
Local To set a local context (available only within the message processing scope), use the context manager scope
Kafka RabbitMQ NATS Redis
from faststream import Context , FastStream , apply_types
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import ContextRepo , KafkaMessage
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle (
msg : str ,
message : KafkaMessage ,
context : ContextRepo ,
):
with context . scope ( "correlation_id" , message . correlation_id ):
call ()
@apply_types
def call (
message : KafkaMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
from faststream import Context , FastStream , apply_types
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import ContextRepo , RabbitMessage
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
@broker . subscriber ( "test-queue" )
async def handle (
msg : str ,
message : RabbitMessage ,
context : ContextRepo ,
):
with context . scope ( "correlation_id" , message . correlation_id ):
call ()
@apply_types
def call (
message : RabbitMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
from faststream import Context , FastStream , apply_types
from faststream.nats import NatsBroker
from faststream.nats.annotations import ContextRepo , NatsMessage
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
@broker . subscriber ( "test-subject" )
async def handle (
msg : str ,
message : NatsMessage ,
context : ContextRepo ,
):
with context . scope ( "correlation_id" , message . correlation_id ):
call ()
@apply_types
def call (
message : NatsMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
from faststream import Context , FastStream , apply_types
from faststream.redis import RedisBroker
from faststream.redis.annotations import ContextRepo , RedisMessage
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
@broker . subscriber ( "test-channel" )
async def handle (
msg : str ,
message : RedisMessage ,
context : ContextRepo ,
):
with context . scope ( "correlation_id" , message . correlation_id ):
call ()
@apply_types
def call (
message : RedisMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
You can also set the context by yourself, and it will remain within the current call stack until you clear it.
Kafka RabbitMQ NATS Redis
from faststream import Context , FastStream , apply_types , context
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import KafkaMessage
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle (
msg : str ,
message : KafkaMessage ,
):
tag = context . set_local ( "correlation_id" , message . correlation_id )
call ( tag )
@apply_types
def call (
tag ,
message : KafkaMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
context . reset_local ( "correlation_id" , tag )
from faststream import Context , FastStream , apply_types , context
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import RabbitMessage
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
@broker . subscriber ( "test-queue" )
async def handle (
msg : str ,
message : RabbitMessage ,
):
tag = context . set_local ( "correlation_id" , message . correlation_id )
call ( tag )
@apply_types
def call (
tag ,
message : RabbitMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
context . reset_local ( "correlation_id" , tag )
from faststream import Context , FastStream , apply_types , context
from faststream.nats import NatsBroker
from faststream.nats.annotations import NatsMessage
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
@broker . subscriber ( "test-subject" )
async def handle (
msg : str ,
message : NatsMessage ,
):
tag = context . set_local ( "correlation_id" , message . correlation_id )
call ( tag )
@apply_types
def call (
tag ,
message : NatsMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
context . reset_local ( "correlation_id" , tag )
from faststream import Context , FastStream , apply_types , context
from faststream.redis import RedisBroker
from faststream.redis.annotations import RedisMessage
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
@broker . subscriber ( "test-channel" )
async def handle (
msg : str ,
message : RedisMessage ,
):
tag = context . set_local ( "correlation_id" , message . correlation_id )
call ( tag )
@apply_types
def call (
tag ,
message : RedisMessage ,
correlation_id = Context (),
):
assert correlation_id == message . correlation_id
context . reset_local ( "correlation_id" , tag )