Skip to content

Pydantic Serialization#

pydantic.Field#

Besides, FastStream uses your handlers' annotations to collect information about the application schema and generate AsyncAPI schema.

You can access this information with extra details using pydantic.Field (such as title, description and examples). Additionally, Fields usage allows you to add extra validations to your message schema.

Just use pydantic.Field as a function default argument:

from pydantic import Field, NonNegativeInt

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
async def handle(
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    ),
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    ),
):
    assert name == "John"
    assert user_id == 1
from pydantic import Field, NonNegativeInt

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic-confluent", auto_offset_reset="earliest")
async def handle(
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    ),
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    ),
):
    assert name == "John"
    assert user_id == 1


@broker.subscriber("test-confluent-wrong-fields", auto_offset_reset="earliest")
async def wrong_handle(
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    ),
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    ),
):
    assert name == "John"
    assert user_id == 1
from pydantic import Field, NonNegativeInt

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test-queue")
async def handle(
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    ),
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    ),
):
    assert name == "John"
    assert user_id == 1
from pydantic import Field, NonNegativeInt

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test-subject")
async def handle(
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    ),
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    ),
):
    assert name == "John"
    assert user_id == 1
from pydantic import Field, NonNegativeInt

from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("test-channel")
async def handle(
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    ),
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    ),
):
    assert name == "John"
    assert user_id == 1

Tip

Also you can use typing.Annotated (python 3.9+) or typing_extensions.Annotated to declare your handler fields

name: Annotated[
    str,
    Field(..., examples=["John"], description="Registered user name")
],
user_id: Annotated[
    NonNegativeInt,
    Field(..., examples=[1], description="Registered user id"),
]

pydantic.BaseModel#

To make your message schema reusable between different subscribers and publishers, you can declare it as a pydantic.BaseModel and use it as a single message annotation:

from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


class UserInfo(BaseModel):
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    )
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    )


@broker.subscriber("test-topic")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


class UserInfo(BaseModel):
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    )
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    )


@broker.subscriber("test-topic")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


class UserInfo(BaseModel):
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    )
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    )


@broker.subscriber("test-queue")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


class UserInfo(BaseModel):
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    )
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    )


@broker.subscriber("test-subject")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1
from pydantic import BaseModel, Field, NonNegativeInt

from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


class UserInfo(BaseModel):
    name: str = Field(
        ..., examples=["John"], description="Registered user name"
    )
    user_id: NonNegativeInt = Field(
        ..., examples=[1], description="Registered user id"
    )


@broker.subscriber("test-channel")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1