Existing Fields Context already contains some global objects that you can always access:
broker - the current broker context - the context itself, in which you can write your own fields logger - the logger used for your broker (tags messages with message_id ) message - the raw message (if you need access to it) At the same time, thanks to contextlib.ContextVar
, message is local for you current consumer scope.
Access to Context Fields By default, the context searches for an object based on the argument name.
from faststream import Context , FastStream
from faststream.kafka import KafkaBroker
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-topic" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.confluent import KafkaBroker
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-topic" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.rabbit import RabbitBroker
broker_object = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-queue" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.nats import NatsBroker
broker_object = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-subject" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream import Context , FastStream
from faststream.redis import RedisBroker
broker_object = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "test-channel" )
async def handle (
msg : str ,
logger = Context (),
message = Context (),
broker = Context (),
context = Context (),
):
logger . info ( message )
await broker . publish ( "test" , "response" )
Annotated Aliases Also, FastStream has already created Annotated
aliases to provide you with comfortable access to existing objects. You can import them directly from faststream
or your broker-specific modules:
from faststream import Logger , ContextRepo
from faststream.kafka.annotations import (
Logger , ContextRepo , KafkaMessage ,
KafkaBroker , KafkaProducer , NoCast ,
)
faststream.kafka.KafkaMessage
is an alias to faststream.kafka.annotations.KafkaMessage
from faststream.kafka import KafkaMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
ContextRepo ,
KafkaMessage ,
Logger ,
KafkaBroker as BrokerAnnotation ,
)
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-topic" )
async def handle_response (
msg : str ,
logger : Logger ,
message : KafkaMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream.confluent.annotations import (
Logger , ContextRepo , KafkaMessage ,
KafkaBroker , KafkaProducer , NoCast ,
)
faststream.confluent.KafkaMessage
is an alias to faststream.confluent.annotations.KafkaMessage
from faststream.confluent import KafkaMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.annotations import (
ContextRepo ,
KafkaMessage ,
Logger ,
KafkaBroker as BrokerAnnotation ,
)
broker_object = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-topic" )
async def handle_response (
msg : str ,
logger : Logger ,
message : KafkaMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream.rabbit.annotations import (
Logger , ContextRepo , RabbitMessage ,
RabbitBroker , RabbitProducer , NoCast ,
)
faststream.rabbit.RabbitMessage
is an alias to faststream.rabbit.annotations.RabbitMessage
from faststream.rabbit import RabbitMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.annotations import (
ContextRepo ,
RabbitMessage ,
Logger ,
RabbitBroker as BrokerAnnotation ,
)
broker_object = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-queue" )
async def handle_response (
msg : str ,
logger : Logger ,
message : RabbitMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream.nats.annotations import (
Logger , ContextRepo , NatsMessage ,
NatsBroker , NatsProducer , NatsJsProducer ,
Client , JsClient , NoCast ,
)
faststream.nats.NatsMessage
is an alias to faststream.nats.annotations.NatsMessage
from faststream.nats import NatsMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.nats import NatsBroker
from faststream.nats.annotations import (
ContextRepo ,
NatsMessage ,
Logger ,
NatsBroker as BrokerAnnotation ,
)
broker_object = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-subject" )
async def handle_response (
msg : str ,
logger : Logger ,
message : NatsMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )
from faststream.redis.annotations import (
Logger , ContextRepo , RedisMessage ,
RedisBroker , Redis , NoCast ,
)
faststream.redis.RedisMessage
is an alias to faststream.redis.annotations.RedisMessage
from faststream.redis import RedisMessage
To use them, simply import and use them as subscriber argument annotations.
from faststream import Context , FastStream
from faststream.redis import RedisBroker
from faststream.redis.annotations import (
ContextRepo ,
RedisMessage ,
Logger ,
RedisBroker as BrokerAnnotation ,
)
broker_object = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker_object )
@broker_object . subscriber ( "response-channel" )
async def handle_response (
msg : str ,
logger : Logger ,
message : RedisMessage ,
context : ContextRepo ,
broker : BrokerAnnotation ,
):
logger . info ( message )
await broker . publish ( "test" , "response" )