Existing Fields Context already contains some global objects that you can always access:
broker - the current broker context - the context itself, in which you can write your own fields logger - the logger used for your broker (tags messages with message_id ) message - the raw message (if you need access to it) At the same time, thanks to contextlib.ContextVar
, message is local for you current consumer scope.
Access to Context Fields By default, the context searches for an object based on the argument name.
AIOKafka Confluent RabbitMQ NATS Redis
from faststream import Context , FastStream
from faststream.kafka import KafkaBroker
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-topic" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.confluent import KafkaBroker
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-topic" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.rabbit import RabbitBroker
broker_object = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-queue" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.nats import NatsBroker
broker_object = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-subject" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.redis import RedisBroker
broker_object = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-channel" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
Annotated Aliases Also, FastStream has already created Annotated
aliases to provide you with comfortable access to existing objects. You can import them directly from faststream
or your broker-specific modules:
from faststream import Logger , ContextRepo
AIOKafka Confluent RabbitMQ NATS Redis
from faststream.kafka.annotations import (
Logger , ContextRepo , KafkaMessage ,
KafkaBroker , KafkaProducer , NoCast ,
)
faststream.kafka.KafkaMessage
is an alias to faststream.kafka.annotations.KafkaMessage
from faststream.kafka import KafkaMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
ContextRepo ,
KafkaMessage ,
Logger ,
KafkaBroker as BrokerAnnotation ,
)
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-topic" )
async def handle_response (
msg : str ,
logger : Logger ,
message : KafkaMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream.confluent.annotations import (
Logger , ContextRepo , KafkaMessage ,
KafkaBroker , KafkaProducer , NoCast ,
)
faststream.confluent.KafkaMessage
is an alias to faststream.confluent.annotations.KafkaMessage
from faststream.confluent import KafkaMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.annotations import (
ContextRepo ,
KafkaMessage ,
Logger ,
KafkaBroker as BrokerAnnotation ,
)
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-topic" )
async def handle_response (
msg : str ,
logger : Logger ,
message : KafkaMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream.rabbit.annotations import (
Logger , ContextRepo , RabbitMessage ,
RabbitBroker , RabbitProducer , NoCast ,
)
faststream.rabbit.RabbitMessage
is an alias to faststream.rabbit.annotations.RabbitMessage
from faststream.rabbit import RabbitMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import (
ContextRepo ,
RabbitMessage ,
Logger ,
RabbitBroker as BrokerAnnotation ,
)
broker_object = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-queue" )
async def handle_response (
msg : str ,
logger : Logger ,
message : RabbitMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream.nats.annotations import (
Logger , ContextRepo , NatsMessage ,
NatsBroker , NatsProducer , NatsJsProducer ,
Client , JsClient , NoCast ,
)
faststream.nats.NatsMessage
is an alias to faststream.nats.annotations.NatsMessage
from faststream.nats import NatsMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.nats import NatsBroker
from faststream.nats.annotations import (
ContextRepo ,
NatsMessage ,
Logger ,
NatsBroker as BrokerAnnotation ,
)
broker_object = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-subject" )
async def handle_response (
msg : str ,
logger : Logger ,
message : NatsMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream.redis.annotations import (
Logger , ContextRepo , RedisMessage ,
RedisBroker , Redis , NoCast ,
)
faststream.redis.RedisMessage
is an alias to faststream.redis.annotations.RedisMessage
from faststream.redis import RedisMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.redis import RedisBroker
from faststream.redis.annotations import (
ContextRepo ,
RedisMessage ,
Logger ,
RedisBroker as BrokerAnnotation ,
)
broker_object = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-channel" )
async def handle_response (
msg : str ,
logger : Logger ,
message : RedisMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )