Skip to content

build_message

faststream.rabbit.testing.build_message #

build_message(message='', queue='', exchange=None, *, routing_key='', persist=False, reply_to=None, headers=None, content_type=None, content_encoding=None, priority=None, correlation_id=None, expiration=None, message_id=None, timestamp=None, message_type=None, user_id=None, app_id=None)

Build a patched RabbitMQ message for testing.

Source code in faststream/rabbit/testing.py
def build_message(
    message: "AioPikaSendableMessage" = "",
    queue: Union["RabbitQueue", str] = "",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    persist: bool = False,
    reply_to: Optional[str] = None,
    headers: Optional["HeadersType"] = None,
    content_type: Optional[str] = None,
    content_encoding: Optional[str] = None,
    priority: Optional[int] = None,
    correlation_id: Optional[str] = None,
    expiration: Optional["DateType"] = None,
    message_id: Optional[str] = None,
    timestamp: Optional["DateType"] = None,
    message_type: Optional[str] = None,
    user_id: Optional[str] = None,
    app_id: Optional[str] = None,
) -> PatchedMessage:
    """Build a patched RabbitMQ message for testing."""
    que = RabbitQueue.validate(queue)
    exch = RabbitExchange.validate(exchange)

    routing = routing_key or que.routing

    msg = AioPikaParser.encode_message(
        message=message,
        persist=persist,
        reply_to=reply_to,
        headers=headers,
        content_type=content_type,
        content_encoding=content_encoding,
        priority=priority,
        correlation_id=correlation_id,
        expiration=expiration,
        message_id=message_id or gen_cor_id(),
        timestamp=timestamp,
        message_type=message_type,
        user_id=user_id,
        app_id=app_id,
    )

    return PatchedMessage(
        aiormq.abc.DeliveredMessage(
            delivery=spec.Basic.Deliver(
                exchange=getattr(exch, "name", ""),
                routing_key=routing,
            ),
            header=ContentHeader(
                properties=spec.Basic.Properties(
                    content_type=msg.content_type,
                    headers=msg.headers,
                    reply_to=msg.reply_to,
                    content_encoding=msg.content_encoding,
                    priority=msg.priority,
                    correlation_id=msg.correlation_id,
                    message_id=msg.message_id,
                    timestamp=msg.timestamp,
                    message_type=message_type,
                    user_id=msg.user_id,
                    app_id=msg.app_id,
                )
            ),
            body=msg.body,
            channel=AsyncMock(),
        ),
    )