Skip to content

Serialization examples#

Protobuf#

In this section, we will explore an example using Protobuf. However, this approach is also applicable to other serialization methods.

Protobuf

Protobuf is an alternative message serialization method commonly used in GRPC. Its main advantage is that it results in much smaller message sizes1 compared to JSON, but it requires a message schema (.proto files) on both the client and server sides.

To begin, install the necessary dependencies:

pip install grpcio-tools

Next, let's define the schema for our message:

message.proto
syntax = "proto3";

message Person {
    string name = 1;
    float age = 2;
}

Now, generate a Python class to work with messages in Protobuf format:

python -m grpc_tools.protoc --python_out=. --pyi_out=. -I . message.proto

This generates two files: message_pb2.py and message_pb2.pyi. We can use the generated class to serialize our messages:

from message_pb2 import Person

from faststream import FastStream, Logger, NoCast
from faststream.rabbit import RabbitBroker, RabbitMessage

broker = RabbitBroker()
app = FastStream(broker)


async def decode_message(msg: RabbitMessage) -> Person:
    decoded = Person()
    decoded.ParseFromString(msg.body)
    return decoded


@broker.subscriber("test", decoder=decode_message)
async def consume(body: NoCast[Person], logger: Logger):
    logger.info(body)


@app.after_startup
async def publish():
    body = Person(name="John", age=25).SerializeToString()
    await broker.publish(body, "test")

Note that we used the NoCast annotation to exclude the message from the pydantic representation of our handler.

async def consume(body: NoCast[Person], logger: Logger):

Msgpack#

Msgpack is another alternative binary data format. Its main advantage is that it results in smaller message sizes2 compared to JSON, although slightly larger than Protobuf. The key advantage is that it doesn't require a message schema, making it easy to use in most cases.

To get started, install the necessary dependencies:

pip install msgpack

Since there is no need for a schema, you can easily write a Msgpack decoder:

import msgpack

from faststream import FastStream, Logger
from faststream.rabbit import RabbitBroker, RabbitMessage

broker = RabbitBroker()
app = FastStream(broker)


async def decode_message(msg: RabbitMessage):
    return msgpack.loads(msg.body)


@broker.subscriber("test", decoder=decode_message)
async def consume(name: str, age: int, logger: Logger):
    logger.info(f"{name}: {age}")


@app.after_startup
async def publish():
    body = msgpack.dumps({"name": "John", "age": 25}, use_bin_type=True)
    await broker.publish(body, "test")

Using Msgpack is much simpler than using Protobuf schemas. Therefore, if you don't have strict message size limitations, you can use Msgpack serialization in most cases.

Avro#

In this section, let's explore how to use Avro encoding and decoding to encode/decode our messages as part of FastStream.

Avro

Apache Avro uses JSON to define data types and protocols and serializes data in a compact binary format. Avro utilizes a schema to structure the data that is being encoded. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed).

To get started, install the necessary dependencies:

pip install fastavro

Next, let's define the schema for our message. You can either define it in the Python file itself as:

person_schema = {
    "type": "record",
    "namespace": "Person",
    "name": "Person",
    "fields": [
        {"doc": "Name", "type": "string", "name": "name"},
        {"doc": "Age", "type": "int", "name": "age"},
    ],
}

Or you can load the schema from an avsc file as:

person_schema = fastavro.schema.load_schema("person.avsc")

The contents of the person.avsc file are:

person.avsc
{
    "type": "record",
    "namespace": "Person",
    "name": "Person",
    "fields": [
        {"doc": "Name", "type": "string", "name": "name"},
        {"doc": "Age", "type": "int", "name": "age"}
    ]
}

Finally, let's use Avro's schemaless_reader and schemaless_writer to decode and encode messages in the FastStream app.

import io

import fastavro

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker, KafkaMessage

broker = KafkaBroker()
app = FastStream(broker)


# person_schema = ...
schema = fastavro.schema.parse_schema(person_schema)

async def decode_message(msg: KafkaMessage):
    bytes_reader = io.BytesIO(msg.body)
    msg_dict = fastavro.schemaless_reader(bytes_reader, schema)
    return msg_dict


@broker.subscriber("test", decoder=decode_message)
async def consume(name: str, age: int, logger: Logger):
    logger.info(f"{name}: {age}")


@app.after_startup
async def publish():
    msg = {"name": "John", "age": 25}

    bytes_writer = io.BytesIO()
    fastavro.schemaless_writer(bytes_writer, schema, msg)
    raw_bytes = bytes_writer.getvalue()

    await broker.publish(raw_bytes, "test")

Tips#

Data Compression#

If you are dealing with very large messages, consider compressing them as well. You can explore libraries such as lz4 or zstd for compression algorithms.

Compression can significantly reduce message size, especially if there are repeated blocks. However, in the case of small message bodies, data compression may increase the message size. Therefore, you should assess the compression impact based on your specific application requirements.

Broker-Level Serialization#

You can still set a custom decoder at the Broker or Router level. However, if you want to automatically encode publishing messages as well, you should explore Middleware for serialization implementation.


  1. For example, a message like { "name": "John", "age": 25 } in JSON takes 27 bytes, while in Protobuf, it takes only 11 bytes. With lists and more complex structures, the savings can be even more significant (up to 20x times). 

  2. A message with Msgpack serialization, such as { "name": "John", "age": 25 }, takes 16 bytes.