def create_publisher(
*,
batch: bool,
key: Optional[bytes],
topic: str,
partition: Optional[int],
headers: Optional[Dict[str, str]],
reply_to: str,
# Publisher args
broker_middlewares: Union[
Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"],
Sequence["BrokerMiddleware[ConfluentMsg]"],
],
middlewares: Sequence["PublisherMiddleware"],
# AsyncAPI args
schema_: Optional[Any],
title_: Optional[str],
description_: Optional[str],
include_in_schema: bool,
autoflush: bool = False,
) -> Union[
"AsyncAPIBatchPublisher",
"AsyncAPIDefaultPublisher",
]:
publisher: Union[AsyncAPIBatchPublisher, AsyncAPIDefaultPublisher]
if batch:
if key:
raise SetupError("You can't setup `key` with batch publisher")
publisher = AsyncAPIBatchPublisher(
topic=topic,
partition=partition,
headers=headers,
reply_to=reply_to,
broker_middlewares=cast(
"Sequence[BrokerMiddleware[Tuple[ConfluentMsg, ...]]]",
broker_middlewares,
),
middlewares=middlewares,
schema_=schema_,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
publisher = AsyncAPIDefaultPublisher(
key=key,
# basic args
topic=topic,
partition=partition,
headers=headers,
reply_to=reply_to,
broker_middlewares=cast(
"Sequence[BrokerMiddleware[ConfluentMsg]]",
broker_middlewares,
),
middlewares=middlewares,
schema_=schema_,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
if autoflush:
default_publish: Callable[..., Awaitable[Optional[Any]]] = publisher.publish
@wraps(default_publish)
async def autoflush_wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
result = await default_publish(*args, **kwargs)
await publisher.flush()
return result
publisher.publish = autoflush_wrapper # type: ignore[method-assign]
return publisher