Besides, FastStream uses your handlers' annotations to collect information about the application schema and generate AsyncAPI schema.
You can access this information with extra details using pydantic.Field (such as title, description and examples). Additionally, Fields usage allows you to add extra validations to your message schema.
Just use pydantic.Field as a function default argument:
frompydanticimportField,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.kafkaimportKafkaBrokerbroker=KafkaBroker("localhost:9092")app=FastStream(broker)@broker.subscriber("test-topic")asyncdefhandle(name:str=Field(...,examples=["John"],description="Registered user name"),user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id"),):assertname=="John"assertuser_id==1
frompydanticimportField,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.confluentimportKafkaBrokerbroker=KafkaBroker("localhost:9092")app=FastStream(broker)@broker.subscriber("test-topic-confluent",auto_offset_reset="earliest")asyncdefhandle(name:str=Field(...,examples=["John"],description="Registered user name"),user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id"),):assertname=="John"assertuser_id==1@broker.subscriber("test-confluent-wrong-fields",auto_offset_reset="earliest")asyncdefwrong_handle(name:str=Field(...,examples=["John"],description="Registered user name"),user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id"),):assertname=="John"assertuser_id==1
frompydanticimportField,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.rabbitimportRabbitBrokerbroker=RabbitBroker("amqp://guest:guest@localhost:5672/")app=FastStream(broker)@broker.subscriber("test-queue")asyncdefhandle(name:str=Field(...,examples=["John"],description="Registered user name"),user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id"),):assertname=="John"assertuser_id==1
frompydanticimportField,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.natsimportNatsBrokerbroker=NatsBroker("nats://localhost:4222")app=FastStream(broker)@broker.subscriber("test-subject")asyncdefhandle(name:str=Field(...,examples=["John"],description="Registered user name"),user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id"),):assertname=="John"assertuser_id==1
frompydanticimportField,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.redisimportRedisBrokerbroker=RedisBroker("redis://localhost:6379")app=FastStream(broker)@broker.subscriber("test-channel")asyncdefhandle(name:str=Field(...,examples=["John"],description="Registered user name"),user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id"),):assertname=="John"assertuser_id==1
Tip
Also you can use typing.Annotated (python 3.9+) or typing_extensions.Annotated to declare your handler fields
name:Annotated[str,Field(...,examples=["John"],description="Registered user name")],user_id:Annotated[NonNegativeInt,Field(...,examples=[1],description="Registered user id"),]
To make your message schema reusable between different subscribers and publishers, you can declare it as a pydantic.BaseModel and use it as a single message annotation:
frompydanticimportBaseModel,Field,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.kafkaimportKafkaBrokerbroker=KafkaBroker("localhost:9092")app=FastStream(broker)classUserInfo(BaseModel):name:str=Field(...,examples=["John"],description="Registered user name")user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id")@broker.subscriber("test-topic")asyncdefhandle(user:UserInfo,):assertuser.name=="John"assertuser.user_id==1
frompydanticimportBaseModel,Field,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.confluentimportKafkaBrokerbroker=KafkaBroker("localhost:9092")app=FastStream(broker)classUserInfo(BaseModel):name:str=Field(...,examples=["John"],description="Registered user name")user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id")@broker.subscriber("test-topic")asyncdefhandle(user:UserInfo,):assertuser.name=="John"assertuser.user_id==1
frompydanticimportBaseModel,Field,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.rabbitimportRabbitBrokerbroker=RabbitBroker("amqp://guest:guest@localhost:5672/")app=FastStream(broker)classUserInfo(BaseModel):name:str=Field(...,examples=["John"],description="Registered user name")user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id")@broker.subscriber("test-queue")asyncdefhandle(user:UserInfo,):assertuser.name=="John"assertuser.user_id==1
frompydanticimportBaseModel,Field,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.natsimportNatsBrokerbroker=NatsBroker("nats://localhost:4222")app=FastStream(broker)classUserInfo(BaseModel):name:str=Field(...,examples=["John"],description="Registered user name")user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id")@broker.subscriber("test-subject")asyncdefhandle(user:UserInfo,):assertuser.name=="John"assertuser.user_id==1
frompydanticimportBaseModel,Field,NonNegativeIntfromfaststreamimportFastStreamfromfaststream.redisimportRedisBrokerbroker=RedisBroker("redis://localhost:6379")app=FastStream(broker)classUserInfo(BaseModel):name:str=Field(...,examples=["John"],description="Registered user name")user_id:NonNegativeInt=Field(...,examples=[1],description="Registered user id")@broker.subscriber("test-channel")asyncdefhandle(user:UserInfo,):assertuser.name=="John"assertuser.user_id==1