def __init__(
self,
subject: Annotated[
str,
Doc("NATS subject to send message."),
],
*,
headers: Annotated[
Optional[Dict[str, str]],
Doc(
"Message headers to store metainformation. "
"**content-type** and **correlation_id** will be set automatically by framework anyway. "
"Can be overridden by `publish.headers` if specified."
),
] = None,
reply_to: Annotated[
str,
Doc("NATS subject name to send response."),
] = "",
# JS
stream: Annotated[
Union[str, "JStream", None],
Doc(
"This option validates that the target `subject` is in presented stream. "
"Can be omitted without any effect."
),
] = None,
timeout: Annotated[
Optional[float],
Doc("Timeout to send message to NATS."),
] = None,
# basic args
middlewares: Annotated[
Sequence["PublisherMiddleware"],
Doc("Publisher middlewares to wrap outgoing messages."),
] = (),
# AsyncAPI information
title: Annotated[
Optional[str],
Doc("AsyncAPI publisher object title."),
] = None,
description: Annotated[
Optional[str],
Doc("AsyncAPI publisher object description."),
] = None,
schema: Annotated[
Optional[Any],
Doc(
"AsyncAPI publishing message type. "
"Should be any python-native object annotation or `pydantic.BaseModel`."
),
] = None,
include_in_schema: Annotated[
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
) -> None:
super().__init__(
subject=subject,
headers=headers,
reply_to=reply_to,
stream=stream,
timeout=timeout,
middlewares=middlewares,
title=title,
description=description,
schema=schema,
include_in_schema=include_in_schema,
)