def __init__(
self,
topic: Annotated[
str,
Doc("Topic where the message will be published."),
],
*,
key: Annotated[
Union[bytes, Any, None],
Doc(
"""
A key to associate with the message. Can be used to
determine which partition to send the message to. If partition
is `None` (and producer's partitioner config is left as default),
then messages with the same key will be delivered to the same
partition (but if key is `None`, partition is chosen randomly).
Must be type `bytes`, or be serializable to bytes via configured
`key_serializer`.
"""
),
] = None,
partition: Annotated[
Optional[int],
Doc(
"""
Specify a partition. If not set, the partition will be
selected using the configured `partitioner`.
"""
),
] = None,
headers: Annotated[
Optional[Dict[str, str]],
Doc(
"Message headers to store metainformation. "
"**content-type** and **correlation_id** will be set automatically by framework anyway. "
"Can be overridden by `publish.headers` if specified."
),
] = None,
reply_to: Annotated[
str,
Doc("Topic name to send response."),
] = "",
batch: Annotated[
bool,
Doc("Whether to send messages in batches or not."),
] = False,
# basic args
middlewares: Annotated[
Iterable["PublisherMiddleware"],
Doc("Publisher middlewares to wrap outgoing messages."),
] = (),
# AsyncAPI args
title: Annotated[
Optional[str],
Doc("AsyncAPI publisher object title."),
] = None,
description: Annotated[
Optional[str],
Doc("AsyncAPI publisher object description."),
] = None,
schema: Annotated[
Optional[Any],
Doc(
"AsyncAPI publishing message type. "
"Should be any python-native object annotation or `pydantic.BaseModel`."
),
] = None,
include_in_schema: Annotated[
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
) -> None:
super().__init__(
topic=topic,
key=key,
partition=partition,
batch=batch,
headers=headers,
reply_to=reply_to,
# basic args
middlewares=middlewares,
# AsyncAPI args
title=title,
description=description,
schema=schema,
include_in_schema=include_in_schema,
)