def __init__(
self,
channel: Annotated[
Union["PubSub", str, None],
Doc("Redis PubSub object name to send message."),
] = None,
*,
list: Annotated[
Union["ListSub", str, None],
Doc("Redis List object name to send message."),
] = None,
stream: Annotated[
Union["StreamSub", str, None],
Doc("Redis Stream object name to send message."),
] = None,
headers: Annotated[
Optional["AnyDict"],
Doc(
"Message headers to store metainformation. "
"Can be overridden by `publish.headers` if specified."
),
] = None,
reply_to: Annotated[
str,
Doc("Reply message destination PubSub object name."),
] = "",
middlewares: Annotated[
Iterable["PublisherMiddleware"],
Doc("Publisher middlewares to wrap outgoing messages."),
] = (),
# AsyncAPI information
title: Annotated[
Optional[str],
Doc("AsyncAPI publisher object title."),
] = None,
description: Annotated[
Optional[str],
Doc("AsyncAPI publisher object description."),
] = None,
schema: Annotated[
Optional[Any],
Doc(
"AsyncAPI publishing message type. "
"Should be any python-native object annotation or `pydantic.BaseModel`."
),
] = None,
include_in_schema: Annotated[
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
) -> None:
super().__init__(
channel=channel,
list=list,
stream=stream,
headers=headers,
reply_to=reply_to,
middlewares=middlewares,
title=title,
description=description,
schema=schema,
include_in_schema=include_in_schema,
)