def __init__(
self,
queue: Annotated[
Union["RabbitQueue", str],
Doc("Default message routing key to publish with."),
] = "",
exchange: Annotated[
Union["RabbitExchange", str, None],
Doc("Target exchange to publish message to."),
] = None,
*,
routing_key: Annotated[
str,
Doc(
"Default message routing key to publish with. "
"Overrides `queue` option if presented."
),
] = "",
mandatory: Annotated[
bool,
Doc(
"Client waits for confirmation that the message is placed to some queue. "
"RabbitMQ returns message to client if there is no suitable queue."
),
] = True,
immediate: Annotated[
bool,
Doc(
"Client expects that there is consumer ready to take the message to work. "
"RabbitMQ returns message to client if there is no suitable consumer."
),
] = False,
timeout: Annotated[
"TimeoutType",
Doc("Send confirmation time from RabbitMQ."),
] = None,
persist: Annotated[
bool,
Doc("Restore the message on RabbitMQ reboot."),
] = False,
reply_to: Annotated[
Optional[str],
Doc(
"Reply message routing key to send with (always sending to default exchange)."
),
] = None,
priority: Annotated[
Optional[int],
Doc("The message priority (0 by default)."),
] = None,
# basic args
middlewares: Annotated[
Iterable["PublisherMiddleware"],
Doc("Publisher middlewares to wrap outgoing messages."),
] = (),
# AsyncAPI args
title: Annotated[
Optional[str],
Doc("AsyncAPI publisher object title."),
] = None,
description: Annotated[
Optional[str],
Doc("AsyncAPI publisher object description."),
] = None,
schema: Annotated[
Optional[Any],
Doc(
"AsyncAPI publishing message type. "
"Should be any python-native object annotation or `pydantic.BaseModel`."
),
] = None,
include_in_schema: Annotated[
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
# message args
headers: Annotated[
Optional["HeadersType"],
Doc(
"Message headers to store metainformation. "
"Can be overridden by `publish.headers` if specified."
),
] = None,
content_type: Annotated[
Optional[str],
Doc(
"Message **content-type** header. "
"Used by application, not core RabbitMQ. "
"Will be set automatically if not specified."
),
] = None,
content_encoding: Annotated[
Optional[str],
Doc("Message body content encoding, e.g. **gzip**."),
] = None,
expiration: Annotated[
Optional["DateType"],
Doc("Message expiration (lifetime) in seconds (or datetime or timedelta)."),
] = None,
message_type: Annotated[
Optional[str],
Doc("Application-specific message type, e.g. **orders.created**."),
] = None,
user_id: Annotated[
Optional[str],
Doc("Publisher connection User ID, validated if set."),
] = None,
) -> None:
super().__init__(
queue=queue,
exchange=exchange,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate,
timeout=timeout,
persist=persist,
reply_to=reply_to,
priority=priority,
headers=headers,
content_type=content_type,
content_encoding=content_encoding,
expiration=expiration,
message_type=message_type,
user_id=user_id,
# basic args
middlewares=middlewares,
# AsyncAPI args
title=title,
description=description,
schema=schema,
include_in_schema=include_in_schema,
)