Skip to content

create_publisher

faststream.confluent.subscriber.factory.create_publisher #

create_publisher(
    *,
    batch: Literal[False],
    key: Optional[bytes],
    topic: str,
    partition: Optional[int],
    headers: Optional[Dict[str, str]],
    reply_to: str,
    broker_middlewares: Sequence[BrokerMiddleware[Message]],
    middlewares: Sequence[PublisherMiddleware],
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
    autoflush: bool = False,
) -> AsyncAPIDefaultPublisher
create_publisher(
    *,
    batch: Literal[True],
    key: Optional[bytes],
    topic: str,
    partition: Optional[int],
    headers: Optional[Dict[str, str]],
    reply_to: str,
    broker_middlewares: Sequence[
        BrokerMiddleware[Tuple[Message, ...]]
    ],
    middlewares: Sequence[PublisherMiddleware],
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
    autoflush: bool = False,
) -> AsyncAPIBatchPublisher
create_publisher(
    *,
    batch: bool,
    key: Optional[bytes],
    topic: str,
    partition: Optional[int],
    headers: Optional[Dict[str, str]],
    reply_to: str,
    broker_middlewares: Union[
        Sequence[BrokerMiddleware[Tuple[Message, ...]]],
        Sequence[BrokerMiddleware[Message]],
    ],
    middlewares: Sequence[PublisherMiddleware],
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
    autoflush: bool = False,
) -> Union[
    AsyncAPIBatchPublisher, AsyncAPIDefaultPublisher
]
create_publisher(
    *,
    batch,
    key,
    topic,
    partition,
    headers,
    reply_to,
    broker_middlewares,
    middlewares,
    schema_,
    title_,
    description_,
    include_in_schema,
    autoflush=False,
)
Source code in faststream/confluent/subscriber/factory.py
def create_publisher(
    *,
    batch: bool,
    key: Optional[bytes],
    topic: str,
    partition: Optional[int],
    headers: Optional[Dict[str, str]],
    reply_to: str,
    # Publisher args
    broker_middlewares: Union[
        Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"],
        Sequence["BrokerMiddleware[ConfluentMsg]"],
    ],
    middlewares: Sequence["PublisherMiddleware"],
    # AsyncAPI args
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
    autoflush: bool = False,
) -> Union[
    "AsyncAPIBatchPublisher",
    "AsyncAPIDefaultPublisher",
]:
    publisher: Union[AsyncAPIBatchPublisher, AsyncAPIDefaultPublisher]
    if batch:
        if key:
            raise SetupError("You can't setup `key` with batch publisher")

        publisher = AsyncAPIBatchPublisher(
            topic=topic,
            partition=partition,
            headers=headers,
            reply_to=reply_to,
            broker_middlewares=cast(
                "Sequence[BrokerMiddleware[Tuple[ConfluentMsg, ...]]]",
                broker_middlewares,
            ),
            middlewares=middlewares,
            schema_=schema_,
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        )
    else:
        publisher = AsyncAPIDefaultPublisher(
            key=key,
            # basic args
            topic=topic,
            partition=partition,
            headers=headers,
            reply_to=reply_to,
            broker_middlewares=cast(
                "Sequence[BrokerMiddleware[ConfluentMsg]]",
                broker_middlewares,
            ),
            middlewares=middlewares,
            schema_=schema_,
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        )

    if autoflush:
        default_publish: Callable[..., Awaitable[Optional[Any]]] = publisher.publish

        @wraps(default_publish)
        async def autoflush_wrapper(*args: Any, **kwargs: Any) -> Optional[Any]:
            result = await default_publish(*args, **kwargs)
            await publisher.flush()
            return result

        publisher.publish = autoflush_wrapper  # type: ignore[method-assign]

    return publisher