Skip to content

AsyncAPIBatchPublisher

faststream.kafka.publisher.asyncapi.AsyncAPIBatchPublisher #

AsyncAPIBatchPublisher(
    *,
    topic,
    partition,
    headers,
    reply_to,
    broker_middlewares,
    middlewares,
    schema_,
    title_,
    description_,
    include_in_schema,
)

Bases: BatchPublisher, AsyncAPIPublisher[Tuple['ConsumerRecord', ...]]

Source code in faststream/kafka/publisher/usecase.py
def __init__(
    self,
    *,
    topic: str,
    partition: Optional[int],
    headers: Optional[Dict[str, str]],
    reply_to: str,
    # Publisher args
    broker_middlewares: Sequence["BrokerMiddleware[MsgType]"],
    middlewares: Sequence["PublisherMiddleware"],
    # AsyncAPI args
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> None:
    super().__init__(
        broker_middlewares=broker_middlewares,
        middlewares=middlewares,
        # AsyncAPI args
        schema_=schema_,
        title_=title_,
        description_=description_,
        include_in_schema=include_in_schema,
    )

    self.topic = topic
    self.partition = partition
    self.reply_to = reply_to
    self.headers = headers

    self._producer = None

title_ instance-attribute #

title_ = title_

description_ instance-attribute #

description_ = description_

include_in_schema instance-attribute #

include_in_schema = include_in_schema

name property #

name

Returns the name of the API operation.

description property #

description

Returns the description of the API operation.

schema_ instance-attribute #

schema_ = schema_

mock instance-attribute #

mock = None

calls instance-attribute #

calls = []

topic instance-attribute #

topic = topic

partition instance-attribute #

partition = partition

reply_to instance-attribute #

reply_to = reply_to

headers instance-attribute #

headers = headers

publish async #

publish(
    message,
    *extra_messages,
    topic="",
    partition=None,
    timestamp_ms=None,
    headers=None,
    reply_to="",
    correlation_id=None,
    no_confirm=False,
    _extra_middlewares=(),
)
Source code in faststream/kafka/publisher/usecase.py
@override
async def publish(
    self,
    message: Annotated[
        Union["SendableMessage", Iterable["SendableMessage"]],
        Doc("One message or iterable messages bodies to send."),
    ],
    *extra_messages: Annotated[
        "SendableMessage",
        Doc("Messages bodies to send."),
    ],
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ] = "",
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    timestamp_ms: Annotated[
        Optional[int],
        Doc(
            """
        Epoch milliseconds (from Jan 1 1970 UTC) to use as
        the message timestamp. Defaults to current time.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc("Messages headers to store metainformation."),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Reply message topic name to send response."),
    ] = "",
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    no_confirm: Annotated[
        bool,
        Doc("Do not wait for Kafka publish confirmation."),
    ] = False,
    # publisher specific
    _extra_middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Extra middlewares to wrap publishing process."),
    ] = (),
) -> None:
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    msgs: Iterable[SendableMessage]
    if extra_messages:
        msgs = (cast("SendableMessage", message), *extra_messages)
    else:
        msgs = cast(Iterable["SendableMessage"], message)

    topic = topic or self.topic
    partition = partition or self.partition
    headers = headers or self.headers
    reply_to = reply_to or self.reply_to
    correlation_id = correlation_id or gen_cor_id()

    call: AsyncFunc = self._producer.publish_batch

    for m in chain(
        self._middlewares[::-1],
        (
            _extra_middlewares
            or (m(None).publish_scope for m in self._broker_middlewares[::-1])
        ),
    ):
        call = partial(m, call)

    await call(
        *msgs,
        topic=topic,
        partition=partition,
        headers=headers,
        reply_to=reply_to,
        correlation_id=correlation_id,
        timestamp_ms=timestamp_ms,
        no_confirm=no_confirm,
    )

request async #

request(
    message,
    topic="",
    *,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    correlation_id=None,
    timeout=0.5,
    _extra_middlewares=(),
)
Source code in faststream/kafka/publisher/usecase.py
@override
async def request(
    self,
    message: Annotated[
        "SendableMessage",
        Doc("Message body to send."),
    ],
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ] = "",
    *,
    key: Annotated[
        Union[bytes, Any, None],
        Doc(
            """
        A key to associate with the message. Can be used to
        determine which partition to send the message to. If partition
        is `None` (and producer's partitioner config is left as default),
        then messages with the same key will be delivered to the same
        partition (but if key is `None`, partition is chosen randomly).
        Must be type `bytes`, or be serializable to bytes via configured
        `key_serializer`.
        """
        ),
    ] = None,
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    timestamp_ms: Annotated[
        Optional[int],
        Doc(
            """
        Epoch milliseconds (from Jan 1 1970 UTC) to use as
        the message timestamp. Defaults to current time.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc("Message headers to store metainformation."),
    ] = None,
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    timeout: Annotated[
        float,
        Doc("Timeout to send RPC request."),
    ] = 0.5,
    # publisher specific
    _extra_middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Extra middlewares to wrap publishing process."),
    ] = (),
) -> "KafkaMessage":
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    topic = topic or self.topic
    partition = partition or self.partition
    headers = headers or self.headers
    correlation_id = correlation_id or gen_cor_id()

    request: AsyncFunc = self._producer.request

    for pub_m in chain(
        self._middlewares[::-1],
        (
            _extra_middlewares
            or (m(None).publish_scope for m in self._broker_middlewares[::-1])
        ),
    ):
        request = partial(pub_m, request)

    published_msg = await request(
        message,
        topic=topic,
        key=key,
        partition=partition,
        headers=headers,
        timeout=timeout,
        correlation_id=correlation_id,
        timestamp_ms=timestamp_ms,
    )

    async with AsyncExitStack() as stack:
        return_msg: Callable[[KafkaMessage], Awaitable[KafkaMessage]] = return_input
        for m in self._broker_middlewares[::-1]:
            mid = m(published_msg)
            await stack.enter_async_context(mid)
            return_msg = partial(mid.consume_scope, return_msg)

        parsed_msg = await self._producer._parser(published_msg)
        parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
        parsed_msg._source_type = SourceType.Response
        return await return_msg(parsed_msg)

    raise AssertionError("unreachable")

setup #

setup(*, producer)
Source code in faststream/broker/publisher/usecase.py
@override
def setup(  # type: ignore[override]
    self,
    *,
    producer: Optional["ProducerProto"],
) -> None:
    self._producer = producer

add_prefix #

add_prefix(prefix)
Source code in faststream/kafka/publisher/usecase.py
def add_prefix(self, prefix: str) -> None:
    self.topic = "".join((prefix, self.topic))

schema #

schema()

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/abc.py
def schema(self) -> Dict[str, Channel]:
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    if self.include_in_schema:
        return self.get_schema()
    else:
        return {}

add_middleware #

add_middleware(middleware)
Source code in faststream/broker/publisher/usecase.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    self._broker_middlewares = (*self._broker_middlewares, middleware)

create staticmethod #

create(
    *,
    batch: Literal[True],
    key: Optional[bytes],
    topic: str,
    partition: Optional[int],
    headers: Optional[Dict[str, str]],
    reply_to: str,
    broker_middlewares: Sequence[
        BrokerMiddleware[Tuple[ConsumerRecord, ...]]
    ],
    middlewares: Sequence[PublisherMiddleware],
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> AsyncAPIBatchPublisher
create(
    *,
    batch: Literal[False],
    key: Optional[bytes],
    topic: str,
    partition: Optional[int],
    headers: Optional[Dict[str, str]],
    reply_to: str,
    broker_middlewares: Sequence[
        BrokerMiddleware[ConsumerRecord]
    ],
    middlewares: Sequence[PublisherMiddleware],
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> AsyncAPIDefaultPublisher
create(
    *,
    batch: bool,
    key: Optional[bytes],
    topic: str,
    partition: Optional[int],
    headers: Optional[Dict[str, str]],
    reply_to: str,
    broker_middlewares: Sequence[
        BrokerMiddleware[
            Union[
                Tuple[ConsumerRecord, ...], ConsumerRecord
            ]
        ]
    ],
    middlewares: Sequence[PublisherMiddleware],
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> Union[
    AsyncAPIBatchPublisher, AsyncAPIDefaultPublisher
]
create(
    *,
    batch,
    key,
    topic,
    partition,
    headers,
    reply_to,
    broker_middlewares,
    middlewares,
    schema_,
    title_,
    description_,
    include_in_schema,
)
Source code in faststream/kafka/publisher/asyncapi.py
@override
@staticmethod
def create(
    *,
    batch: bool,
    key: Optional[bytes],
    topic: str,
    partition: Optional[int],
    headers: Optional[Dict[str, str]],
    reply_to: str,
    # Publisher args
    broker_middlewares: Sequence[
        "BrokerMiddleware[Union[Tuple[ConsumerRecord, ...], ConsumerRecord]]"
    ],
    middlewares: Sequence["PublisherMiddleware"],
    # AsyncAPI args
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> Union[
    "AsyncAPIBatchPublisher",
    "AsyncAPIDefaultPublisher",
]:
    if batch:
        if key:
            raise SetupError("You can't setup `key` with batch publisher")

        return AsyncAPIBatchPublisher(
            topic=topic,
            partition=partition,
            headers=headers,
            reply_to=reply_to,
            broker_middlewares=broker_middlewares,
            middlewares=middlewares,
            schema_=schema_,
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        )
    else:
        return AsyncAPIDefaultPublisher(
            key=key,
            # basic args
            topic=topic,
            partition=partition,
            headers=headers,
            reply_to=reply_to,
            broker_middlewares=broker_middlewares,
            middlewares=middlewares,
            schema_=schema_,
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        )

get_name #

get_name()
Source code in faststream/kafka/publisher/asyncapi.py
def get_name(self) -> str:
    return f"{self.topic}:Publisher"

get_description #

get_description()

Description property fallback.

Source code in faststream/asyncapi/abc.py
def get_description(self) -> Optional[str]:
    """Description property fallback."""
    return None

get_schema #

get_schema()
Source code in faststream/kafka/publisher/asyncapi.py
def get_schema(self) -> Dict[str, Channel]:
    payloads = self.get_payloads()

    return {
        self.name: Channel(
            description=self.description,
            publish=Operation(
                message=Message(
                    title=f"{self.name}:Message",
                    payload=resolve_payloads(payloads, "Publisher"),
                    correlationId=CorrelationId(
                        location="$message.header#/correlation_id"
                    ),
                ),
            ),
            bindings=ChannelBinding(kafka=kafka.ChannelBinding(topic=self.topic)),
        )
    }

get_payloads #

get_payloads()
Source code in faststream/broker/publisher/usecase.py
def get_payloads(self) -> List[Tuple["AnyDict", str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self.schema_:
        params = {"response__": (self.schema_, ...)}

        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(  # type: ignore[call-overload]
                "",
                __config__=get_config_base(),
                **params,
            ),
            params=params,
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/broker/publisher/usecase.py
def set_test(
    self,
    *,
    mock: Annotated[
        MagicMock,
        Doc("Mock object to check in tests."),
    ],
    with_fake: Annotated[
        bool,
        Doc("Whetevet publisher's fake subscriber created or not."),
    ],
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/broker/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock = None