Annotation Serialization#
Basic usage#
As you already know, FastStream serializes your incoming message body according to the function type annotations using Pydantic.
So, there are some valid use cases:
@broker.subscriber("test")
async def handle(
msg: str,
):
...
@broker.subscriber("test")
async def handle(
msg: bytes,
):
...
@broker.subscriber("test")
async def handle(
msg: int,
):
...
As with other Python primitive types as well (float
, bool
, datetime
, etc)
Note
If the incoming message cannot be serialized by the described schema, FastStream raises a pydantic.ValidationError
with a correct log message.
Also, thanks to Pydantic (again), FastStream is able to serialize (and validate) more complex types like pydantic.HttpUrl
, pydantic.PositiveInt
, etc.
JSON Basic Serialization#
But how can we serialize more complex message, like { "name": "John", "user_id": 1 }
?
For sure, we can serialize it as a simple dict
from typing import Dict, Any
@broker.subscriber("test")
async def handle(
msg: dict[str, Any],
):
...
But it doesn't looks like a correct message validation, does it?
For this reason, FastStream supports per-argument message serialization: you can declare multiple arguments with various types and your message will unpack to them:
Tip
By default FastStream uses json.loads
to decode and json.dumps
to encode your messages. But if you prefer orjson, just install it and framework will use it automatically.