Existing Fields
Context already contains some global objects that you can always access:
broker - the current broker
context - the context itself, in which you can write your own fields
logger - the logger used for your broker (tags messages with message_id )
message - the raw message (if you need access to it)
At the same time, thanks to contextlib.ContextVar
, message is local for you current consumer scope.
Access to Context Fields
By default, the context searches for an object based on the argument name.
Kafka RabbitMQ
from faststream import Context , FastStream
from faststream.kafka import KafkaBroker
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-topic" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.rabbit import RabbitBroker
broker_object = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-queue" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
Annotated Aliases
Also, FastStream has already created Annotated
aliases to provide you with comfortable access to existing objects. You can import them directly from faststream
or your broker-specific modules:
from faststream import Logger , ContextRepo
from faststream.kafka.annotations import (
Logger , ContextRepo , KafkaMessage , KafkaBroker , KafkaProducer
)
from faststream.rabbit.annotations import (
Logger , ContextRepo , RabbitMessage , RabbitBroker , RabbitProducer
)
To use them, simply import and use them as subscriber argument annotations.
Kafka RabbitMQ
from faststream import Context , FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
ContextRepo ,
KafkaMessage ,
Logger ,
KafkaBroker as BrokerAnnotation ,
)
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-topic" )
async def handle_response (
msg : str ,
logger : Logger ,
message : KafkaMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import (
ContextRepo ,
RabbitMessage ,
Logger ,
RabbitBroker as BrokerAnnotation ,
)
broker_object = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-queue" )
async def handle_response (
msg : str ,
logger : Logger ,
message : RabbitMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
Last update:
2023-09-21