Access by Name Sometimes, you may need to use a different name for the argument (not the one under which it is stored in the context) or get access to specific parts of the object. To do this, simply specify the name of what you want to access, and the context will provide you with the object.
AIOKafka Confluent RabbitMQ NATS Redis
from faststream import Context , FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.message import KafkaMessage
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle (
msg : KafkaMessage = Context ( "message" ),
correlation_id : str = Context ( "message.correlation_id" ),
user_header : str = Context ( "message.headers.user" ),
):
assert msg . correlation_id == correlation_id
assert msg . headers [ "user" ] == user_header
This way you can get access to context object by its name
msg : KafkaMessage = Context ( "message" ),
from faststream import Context , FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.message import KafkaMessage
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle (
msg : KafkaMessage = Context ( "message" ),
correlation_id : str = Context ( "message.correlation_id" ),
user_header : str = Context ( "message.headers.user" ),
):
assert msg . correlation_id == correlation_id
assert msg . headers [ "user" ] == user_header
This way you can get access to context object by its name
msg : KafkaMessage = Context ( "message" ),
from faststream import Context , FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.message import RabbitMessage
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
@broker . subscriber ( "test-queue" )
async def handle (
msg : RabbitMessage = Context ( "message" ),
correlation_id : str = Context ( "message.correlation_id" ),
user_header : str = Context ( "message.headers.user" ),
):
assert msg . correlation_id == correlation_id
assert msg . headers [ "user" ] == user_header
This way you can get access to context object by its name
msg : RabbitMessage = Context ( "message" ),
from faststream import Context , FastStream
from faststream.nats import NatsBroker
from faststream.nats.message import NatsMessage
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
@broker . subscriber ( "test-subject" )
async def handle (
msg : NatsMessage = Context ( "message" ),
correlation_id : str = Context ( "message.correlation_id" ),
user_header : str = Context ( "message.headers.user" ),
):
assert msg . correlation_id == correlation_id
assert msg . headers [ "user" ] == user_header
This way you can get access to context object by its name
msg : NatsMessage = Context ( "message" ),
from faststream import Context , FastStream
from faststream.redis import RedisBroker
from faststream.redis.message import RedisMessage
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
@broker . subscriber ( "test-channel" )
async def handle (
msg : RedisMessage = Context ( "message" ),
correlation_id : str = Context ( "message.correlation_id" ),
user_header : str = Context ( "message.headers.user" ),
):
assert msg . correlation_id == correlation_id
assert msg . headers [ "user" ] == user_header
This way you can get access to context object by its name
msg : RedisMessage = Context ( "message" ),
This way you can get access to context object specific field
correlation_id : str = Context ( "message.correlation_id" ),
Or even to a dict key
user_header : str = Context ( "message.headers.user" ),