def build_message(
message: "AioPikaSendableMessage" = "",
queue: Union["RabbitQueue", str] = "",
exchange: Union["RabbitExchange", str, None] = None,
*,
routing_key: str = "",
persist: bool = False,
reply_to: Optional[str] = None,
headers: Optional["HeadersType"] = None,
content_type: Optional[str] = None,
content_encoding: Optional[str] = None,
priority: Optional[int] = None,
correlation_id: Optional[str] = None,
expiration: Optional["DateType"] = None,
message_id: Optional[str] = None,
timestamp: Optional["DateType"] = None,
message_type: Optional[str] = None,
user_id: Optional[str] = None,
app_id: Optional[str] = None,
) -> PatchedMessage:
"""Build a patched RabbitMQ message for testing."""
que = RabbitQueue.validate(queue)
exch = RabbitExchange.validate(exchange)
routing = routing_key or que.routing
msg = AioPikaParser.encode_message(
message=message,
persist=persist,
reply_to=reply_to,
headers=headers,
content_type=content_type,
content_encoding=content_encoding,
priority=priority,
correlation_id=correlation_id,
expiration=expiration,
message_id=message_id or gen_cor_id(),
timestamp=timestamp,
message_type=message_type,
user_id=user_id,
app_id=app_id,
)
return PatchedMessage(
aiormq.abc.DeliveredMessage(
delivery=spec.Basic.Deliver(
exchange=getattr(exch, "name", ""),
routing_key=routing,
),
header=ContentHeader(
properties=spec.Basic.Properties(
content_type=msg.content_type,
headers=msg.headers,
reply_to=msg.reply_to,
content_encoding=msg.content_encoding,
priority=msg.priority,
correlation_id=msg.correlation_id,
message_id=msg.message_id,
timestamp=msg.timestamp,
message_type=message_type,
user_id=msg.user_id,
app_id=msg.app_id,
)
),
body=msg.body,
channel=AsyncMock(),
),
)