Skip to content

Annotation Serialization#

Basic usage#

As you already know, FastStream serializes your incoming message body according to the function type annotations using Pydantic.

So, there are some valid usecases:

@broker.subscriber("test")
async def handle(msg: str):
    ...

@broker.subscriber("test")
async def handle(msg: bytes):
    ...

@broker.subscriber("test")
async def handle(msg: int):
    ...

As with other Python primitive types as well (float, bool, datetime, etc)

Note

If the incoming message cannot be serialized by the described schema, FastStream raises a pydantic.ValidationError with a correct log message.

Also, thanks to Pydantic (again), FastStream is able to serialize (and validate) more complex types like pydantic.HttpUrl, pydantic.PostitiveInt, etc.

JSON Basic Serialization#

But how can we serialize more complex message, like { "name": "John", "user_id": 1 } ?

For sure, we can serialize it as a simple dict

from typing import Dict, Any

@broker.subscriber("test")
async def handle(msg: dict[str, Any]):
    ...

But it doesn't looks like a correct message validation, does it?

For this reason, FastStream supports per-argument message serialization: you can declare multiple arguments with various types and your message will unpack to them:

@broker.subscriber("test-topic")
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
@broker.subscriber("test-queue")
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1
@broker.subscriber("test-subject")
async def handle(name: str, user_id: int):
    assert name == "John"
    assert user_id == 1

Last update: 2023-09-21