Custom Parser At this stage, FastStream serializes an incoming message from the broker's framework into a general format called StreamMessage . During this stage, the message body remains in the form of raw bytes.
StreamMessage is a general representation of a message within FastStream . It contains all the information required for message processing within FastStreams . It is even used to represent message batches, so the primary reason to customize it is to redefine the metadata associated with FastStream messages.
For example, you can specify your own header with the message_id
semantic. This allows you to inform FastStream about this custom header through parser customization.
Signature To create a custom message parser, you should write a regular Python function (synchronous or asynchronous) with the following signature:
Alternatively, you can reuse the original parser function with the following signature:
Kafka RabbitMQ NATS
from types import Callable , Awaitable
from aiokafka import ConsumerRecord
from faststream.kafka import KafkaMessage
async def parser (
msg : ConsumerRecord ,
original_parser : Callable [[ ConsumerRecord ], Awaitable [ KafkaMessage ]],
) -> KafkaMessage :
return await original_parser ( msg )
from types import Callable , Awaitable
from aio_pika import IncomingMessage
from faststream.rabbit import RabbitMessage
async def parser (
msg : IncomingMessage ,
original_parser : Callable [[ IncomingMessage ], Awaitable [ RabbitMessage ]],
) -> RabbitMessage :
return await original_parser ( msg )
from types import Callable , Awaitable
from nats.aio.msg import Msg
from faststream.nats import NatsMessage
async def parser (
msg : Msg ,
original_parser : Callable [[ Msg ], Awaitable [ NatsMessage ]],
) -> RabbitMessage :
return await original_parser ( msg )
The argument naming doesn't matter; the parser will always be placed as the second argument.
Note
The original parser is always an asynchronous function, so your custom parser should also be asynchronous.
Afterward, you can set this custom parser at the broker or subscriber level.
Example As an example, let's redefine message_id
to a custom header:
Kafka RabbitMQ NATS
from typing import Awaitable , Callable
from aiokafka import ConsumerRecord
from faststream import FastStream
from faststream.kafka import KafkaBroker , KafkaMessage
async def custom_parser (
msg : ConsumerRecord ,
original_parser : Callable [[ ConsumerRecord ], Awaitable [ KafkaMessage ]],
) -> KafkaMessage :
parsed_msg = await original_parser ( msg )
parsed_msg . message_id = parsed_msg . headers [ "custom_message_id" ]
return parsed_msg
broker = KafkaBroker ( parser = custom_parser )
app = FastStream ( broker )
@broker . subscriber ( "test" )
async def handle ():
...
@app . after_startup
async def test ():
await broker . publish ( "" , "test" , headers = { "custom_message_id" : "1" })
from typing import Awaitable , Callable
from aio_pika import IncomingMessage
from faststream import FastStream
from faststream.rabbit import RabbitBroker , RabbitMessage
async def custom_parser (
msg : IncomingMessage ,
original_parser : Callable [[ IncomingMessage ], Awaitable [ RabbitMessage ]],
) -> RabbitMessage :
parsed_msg = await original_parser ( msg )
parsed_msg . message_id = parsed_msg . headers [ "custom_message_id" ]
return parsed_msg
broker = RabbitBroker ( parser = custom_parser )
app = FastStream ( broker )
@broker . subscriber ( "test" )
async def handle ():
...
@app . after_startup
async def test ():
await broker . publish ( "" , "test" , headers = { "custom_message_id" : "1" })
from typing import Awaitable , Callable
from nats.aio.msg import Msg
from faststream import FastStream
from faststream.nats import NatsBroker , NatsMessage
async def custom_parser (
msg : Msg ,
original_parser : Callable [[ Msg ], Awaitable [ NatsMessage ]],
) -> NatsMessage :
parsed_msg = await original_parser ( msg )
parsed_msg . message_id = parsed_msg . headers [ "custom_message_id" ]
return parsed_msg
broker = NatsBroker ( parser = custom_parser )
app = FastStream ( broker )
@broker . subscriber ( "test" )
async def handle ():
...
@app . after_startup
async def test ():
await broker . publish ( "" , "test" , headers = { "custom_message_id" : "1" })
Last update: 2023-09-21