Skip to content

AioPikaParser

faststream.rabbit.parser.AioPikaParser #

AioPikaParser(pattern=None)

A class for parsing, encoding, and decoding messages using aio-pika.

Source code in faststream/rabbit/parser.py
def __init__(self, pattern: Optional["Pattern[str]"] = None) -> None:
    self.pattern = pattern

pattern instance-attribute #

pattern = pattern

parse_message async #

parse_message(message)

Parses an incoming message and returns a RabbitMessage object.

Source code in faststream/rabbit/parser.py
async def parse_message(
    self,
    message: "IncomingMessage",
) -> StreamMessage["IncomingMessage"]:
    """Parses an incoming message and returns a RabbitMessage object."""
    if (path_re := self.pattern) and (
        match := path_re.match(message.routing_key or "")
    ):
        path = match.groupdict()
    else:
        path = {}

    return RabbitMessage(
        body=message.body,
        headers=message.headers,
        reply_to=message.reply_to or "",
        content_type=message.content_type,
        message_id=message.message_id or gen_cor_id(),
        correlation_id=message.correlation_id or gen_cor_id(),
        path=path,
        raw_message=message,
    )

decode_message async #

decode_message(msg)

Decode a message.

Source code in faststream/rabbit/parser.py
async def decode_message(
    self,
    msg: StreamMessage["IncomingMessage"],
) -> "DecodedMessage":
    """Decode a message."""
    return decode_message(msg)

encode_message staticmethod #

encode_message(message, *, persist, reply_to, headers, content_type, content_encoding, priority, correlation_id, expiration, message_id, timestamp, message_type, user_id, app_id)

Encodes a message for sending using AioPika.

Source code in faststream/rabbit/parser.py
@staticmethod
def encode_message(
    message: "AioPikaSendableMessage",
    *,
    persist: bool,
    reply_to: Optional[str],
    headers: Optional["HeadersType"],
    content_type: Optional[str],
    content_encoding: Optional[str],
    priority: Optional[int],
    correlation_id: Optional[str],
    expiration: Optional["DateType"],
    message_id: Optional[str],
    timestamp: Optional["DateType"],
    message_type: Optional[str],
    user_id: Optional[str],
    app_id: Optional[str],
) -> Message:
    """Encodes a message for sending using AioPika."""
    if isinstance(message, Message):
        return message

    else:
        message_body, generated_content_type = encode_message(message)

        delivery_mode = (
            DeliveryMode.PERSISTENT if persist else DeliveryMode.NOT_PERSISTENT
        )

        return Message(
            message_body,
            content_type=content_type or generated_content_type,
            delivery_mode=delivery_mode,
            reply_to=reply_to,
            correlation_id=correlation_id or gen_cor_id(),
            headers=headers,
            content_encoding=content_encoding,
            priority=priority,
            expiration=expiration,
            message_id=message_id,
            timestamp=timestamp,
            type=message_type,
            user_id=user_id,
            app_id=app_id,
        )