fromfaststreamimportContext,FastStreamfromfaststream.kafkaimportKafkaBrokerbroker=KafkaBroker("localhost:9092")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message=Context(),# get access to raw message):...
fromfaststreamimportContext,FastStreamfromfaststream.confluentimportKafkaBrokerbroker=KafkaBroker("localhost:9092")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message=Context(),# get access to raw message):...
fromfaststreamimportContext,FastStreamfromfaststream.rabbitimportRabbitBrokerbroker=RabbitBroker("amqp://guest:guest@localhost:5672/")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message=Context(),# get access to raw message):...
fromfaststreamimportContext,FastStreamfromfaststream.natsimportNatsBrokerbroker=NatsBroker("nats://localhost:4222")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message=Context(),# get access to raw message):...
fromfaststreamimportContext,FastStreamfromfaststream.redisimportRedisBrokerbroker=RedisBroker("redis://localhost:6379")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message=Context(),# get access to raw message):...
But, with the Annotated Python feature usage, it is much closer to @pytest.fixture.
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.kafkaimportKafkaBrokerfromfaststream.kafka.messageimportKafkaMessageMessage=Annotated[KafkaMessage,Context()]broker=KafkaBroker("localhost:9092")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Message,# get access to raw message):...
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.confluentimportKafkaBrokerfromfaststream.confluent.messageimportKafkaMessageMessage=Annotated[KafkaMessage,Context()]broker=KafkaBroker("localhost:9092")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Message,# get access to raw message):...
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.rabbitimportRabbitBrokerfromfaststream.rabbit.messageimportRabbitMessageMessage=Annotated[RabbitMessage,Context()]broker=RabbitBroker("amqp://guest:guest@localhost:5672/")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Message,# get access to raw message):...
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.natsimportNatsBrokerfromfaststream.nats.messageimportNatsMessageMessage=Annotated[NatsMessage,Context()]broker=NatsBroker("nats://localhost:4222")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Message,# get access to raw message):...
fromtypingimportAnnotatedfromfaststreamimportContext,FastStreamfromfaststream.redisimportRedisBrokerfromfaststream.redis.messageimportRedisMessageMessage=Annotated[RedisMessage,Context()]broker=RedisBroker("redis://localhost:6379")app=FastStream(broker)@broker.subscriber("test")asyncdefbase_handler(body:str,message:Message,# get access to raw message):...
To use context in other functions, use the @apply_types decorator. In this case, the context of the called function will correspond to the context of the event handler from which it was called.