Skip to content

Custom Parser#

At this stage, FastStream serializes an incoming message from the broker's framework into a general format called StreamMessage. During this stage, the message body remains in the form of raw bytes.

StreamMessage is a general representation of a message within FastStream. It contains all the information required for message processing within FastStreams. It is even used to represent message batches, so the primary reason to customize it is to redefine the metadata associated with FastStream messages.

For example, you can specify your own header with the message_id semantic. This allows you to inform FastStream about this custom header through parser customization.

Signature#

To create a custom message parser, you should write a regular Python function (synchronous or asynchronous) with the following signature:

from aiokafka import ConsumerRecord
from faststream.kafka import KafkaMessage

def parser(msg: ConsumerRecord) -> KafkaMessage:
    ...
from confluent_kafka import Message
from faststream.confluent import KafkaMessage

def parser(msg: Message) -> KafkaMessage:
    ...
from aio_pika import IncomingMessage
from faststream.rabbit import RabbitMessage

def parser(msg: IncomingMessage) -> RabbitMessage:
    ...
from nats.aio.msg import Msg
from faststream.nats import NatsMessage

def parser(msg: Msg) -> NatsMessage:
    ...
from faststream.redis import RedisMessage
from faststream.redis.message import PubSubMessage

def parser(msg: PubSubMessage) -> RedisMessage:
    ...

Alternatively, you can reuse the original parser function with the following signature:

from types import Callable, Awaitable
from faststream.kafka import ConsumerRecord, KafkaMessage

async def parser(
    msg: ConsumerRecord,
    original_parser: Callable[[ConsumerRecord], Awaitable[KafkaMessage]],
) -> KafkaMessage:
    return await original_parser(msg)
from confluent_kafka import Message
from types import Callable, Awaitable
from faststream.confluent import KafkaMessage

async def parser(
    msg: Message,
    original_parser: Callable[[Message], Awaitable[KafkaMessage]],
) -> KafkaMessage:
    return await original_parser(msg)
from types import Callable, Awaitable
from aio_pika import IncomingMessage
from faststream.rabbit import RabbitMessage

async def parser(
    msg: IncomingMessage,
    original_parser: Callable[[IncomingMessage], Awaitable[RabbitMessage]],
) -> RabbitMessage:
    return await original_parser(msg)
from types import Callable, Awaitable
from nats.aio.msg import Msg
from faststream.nats import NatsMessage

async def parser(
    msg: Msg,
    original_parser: Callable[[Msg], Awaitable[NatsMessage]],
) -> NatsMessage:
    return await original_parser(msg)
from types import Callable, Awaitable
from faststream.redis import RedisMessage
from faststream.redis.message import PubSubMessage

async def parser(
    msg: PubSubMessage,
    original_parser: Callable[[PubSubMessage], Awaitable[RedisMessage]],
) -> RedisMessage:
    return await original_parser(msg)

The argument naming doesn't matter; the parser will always be placed as the second argument.

Note

The original parser is always an asynchronous function, so your custom parser should also be asynchronous.

Afterward, you can set this custom parser at the broker or subscriber level.

Example#

As an example, let's redefine message_id to a custom header:

from typing import Awaitable, Callable

from aiokafka import ConsumerRecord

from faststream import FastStream
from faststream.kafka import KafkaBroker, KafkaMessage


async def custom_parser(
    msg: ConsumerRecord,
    original_parser: Callable[[ConsumerRecord], Awaitable[KafkaMessage]],
) -> KafkaMessage:
    parsed_msg = await original_parser(msg)
    parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
    return parsed_msg


broker = KafkaBroker(parser=custom_parser)
app = FastStream(broker)


@broker.subscriber("test")
async def handle():
    ...


@app.after_startup
async def test():
    await broker.publish("", "test", headers={"custom_message_id": "1"})
from typing import Awaitable, Callable

from confluent_kafka import Message

from faststream import FastStream
from faststream.confluent import KafkaBroker, KafkaMessage


async def custom_parser(
    msg: Message,
    original_parser: Callable[[Message], Awaitable[KafkaMessage]],
) -> KafkaMessage:
    parsed_msg = await original_parser(msg)
    parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
    return parsed_msg


broker = KafkaBroker(parser=custom_parser)
app = FastStream(broker)


@broker.subscriber("test")
async def handle():
    ...


@app.after_startup
async def test():
    await broker.publish("", "test", headers={"custom_message_id": "1"})
from typing import Awaitable, Callable

from aio_pika import IncomingMessage

from faststream import FastStream
from faststream.rabbit import RabbitBroker, RabbitMessage


async def custom_parser(
    msg: IncomingMessage,
    original_parser: Callable[[IncomingMessage], Awaitable[RabbitMessage]],
) -> RabbitMessage:
    parsed_msg = await original_parser(msg)
    parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
    return parsed_msg


broker = RabbitBroker(parser=custom_parser)
app = FastStream(broker)


@broker.subscriber("test")
async def handle():
    ...


@app.after_startup
async def test():
    await broker.publish("", "test", headers={"custom_message_id": "1"})
from typing import Awaitable, Callable

from nats.aio.msg import Msg

from faststream import FastStream
from faststream.nats import NatsBroker, NatsMessage


async def custom_parser(
    msg: Msg,
    original_parser: Callable[[Msg], Awaitable[NatsMessage]],
) -> NatsMessage:
    parsed_msg = await original_parser(msg)
    parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
    return parsed_msg


broker = NatsBroker(parser=custom_parser)
app = FastStream(broker)


@broker.subscriber("test")
async def handle():
    ...


@app.after_startup
async def test():
    await broker.publish("", "test", headers={"custom_message_id": "1"})
from typing import Awaitable, Callable

from faststream import FastStream
from faststream.redis import RedisBroker, RedisMessage
from faststream.redis.message import PubSubMessage


async def custom_parser(
    msg: PubSubMessage,
    original_parser: Callable[[PubSubMessage], Awaitable[RedisMessage]],
) -> RedisMessage:
    parsed_msg = await original_parser(msg)
    parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
    return parsed_msg


broker = RedisBroker(parser=custom_parser)
app = FastStream(broker)


@broker.subscriber("test")
async def handle():
    ...


@app.after_startup
async def test():
    await broker.publish("", "test", headers={"custom_message_id": "1"})