Skip to content

build_message

faststream.rabbit.test.build_message #

build_message(
    message: AioPikaSendableMessage = "",
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    reply_to: Optional[str] = None,
    **message_kwargs: Any
) -> PatchedMessage

Build a patched RabbitMQ message for testing.

PARAMETER DESCRIPTION
message

The message content.

TYPE: AioPikaSendableMessage DEFAULT: ''

queue

The message queue.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

The message exchange.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

The message routing key.

TYPE: str DEFAULT: ''

reply_to

The reply-to queue.

TYPE: Optional[str] DEFAULT: None

**message_kwargs

Additional message arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
PatchedMessage

A patched RabbitMQ message.

TYPE: PatchedMessage

Source code in faststream/rabbit/test.py
def build_message(
    message: AioPikaSendableMessage = "",
    queue: Union[RabbitQueue, str] = "",
    exchange: Union[RabbitExchange, str, None] = None,
    *,
    routing_key: str = "",
    reply_to: Optional[str] = None,
    **message_kwargs: Any,
) -> PatchedMessage:
    """
    Build a patched RabbitMQ message for testing.

    Args:
        message (AioPikaSendableMessage): The message content.
        queue (Union[RabbitQueue, str]): The message queue.
        exchange (Union[RabbitExchange, str, None]): The message exchange.
        routing_key (str): The message routing key.
        reply_to (Optional[str]): The reply-to queue.
        **message_kwargs (Any): Additional message arguments.

    Returns:
        PatchedMessage: A patched RabbitMQ message.
    """
    que = RabbitQueue.validate(queue)
    exch = RabbitExchange.validate(exchange)
    msg = AioPikaParser.encode_message(
        message=message,
        persist=False,
        reply_to=reply_to,
        callback_queue=None,
        **message_kwargs,
    )

    routing = routing_key or (getattr(que, "name", ""))

    return PatchedMessage(
        aiormq.abc.DeliveredMessage(
            delivery=spec.Basic.Deliver(
                exchange=getattr(exch, "name", ""),
                routing_key=routing,
            ),
            header=ContentHeader(
                properties=spec.Basic.Properties(
                    content_type=msg.content_type,
                    message_id=str(uuid4()),
                    headers=msg.headers,
                    reply_to=reply_to,
                )
            ),
            body=msg.body,
            channel=AsyncMock(),
        )
    )

Last update: 2023-11-13