Skip to content

Annotation Serialization#

Basic usage#

As you already know, FastStream serializes your incoming message body according to the function type annotations using Pydantic.

So, there are some valid use cases:

@broker.subscriber("test")
async def handle(
    msg: str,
):
    ...

@broker.subscriber("test")
async def handle(
    msg: bytes,
):
    ...

@broker.subscriber("test")
async def handle(
    msg: int,
):
    ...

As with other Python primitive types as well (float, bool, datetime, etc)

Note

If the incoming message cannot be serialized by the described schema, FastStream raises a pydantic.ValidationError with a correct log message.

Also, thanks to Pydantic (again), FastStream is able to serialize (and validate) more complex types like pydantic.HttpUrl, pydantic.PositiveInt, etc.

JSON Basic Serialization#

But how can we serialize more complex message, like { "name": "John", "user_id": 1 } ?

For sure, we can serialize it as a simple dict

from typing import Dict, Any

@broker.subscriber("test")
async def handle(
    msg: dict[str, Any],
):
    ...

But it doesn't looks like a correct message validation, does it?

For this reason, FastStream supports per-argument message serialization: you can declare multiple arguments with various types and your message will unpack to them:

@broker.subscriber("test-topic")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1
@broker.subscriber("test-topic")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1
@broker.subscriber("test-queue")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1
@broker.subscriber("test-subject")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1
@broker.subscriber("test-channel")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1

Tip

By default FastStream uses json.loads to decode and json.dumps to encode your messages. But if you prefer orjson, just install it and framework will use it automatically.