Skip to content

FakeProducer

faststream.kafka.testing.FakeProducer #

FakeProducer(broker)

Bases: AioKafkaFastProducer

A fake Kafka producer for testing purposes.

This class extends AioKafkaFastProducer and is used to simulate Kafka message publishing during tests.

Source code in faststream/kafka/testing.py
def __init__(self, broker: KafkaBroker) -> None:
    self.broker = broker

    default = AioKafkaParser(
        msg_class=KafkaMessage,
        regex=None,
    )

    self._parser = resolve_custom_func(broker._parser, default.parse_message)
    self._decoder = resolve_custom_func(broker._decoder, default.decode_message)

broker instance-attribute #

broker = broker

stop async #

stop()
Source code in faststream/kafka/publisher/producer.py
async def stop(self) -> None:
    await self._producer.stop()

publish async #

publish(message, topic, key=None, partition=None, timestamp_ms=None, headers=None, correlation_id=None, *, reply_to='', rpc=False, rpc_timeout=None, raise_timeout=False, no_confirm=False)

Publish a message to the Kafka broker.

Source code in faststream/kafka/testing.py
@override
async def publish(  # type: ignore[override]
    self,
    message: "SendableMessage",
    topic: str,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    *,
    reply_to: str = "",
    rpc: bool = False,
    rpc_timeout: Optional[float] = None,
    raise_timeout: bool = False,
    no_confirm: bool = False,
) -> Optional[Any]:
    """Publish a message to the Kafka broker."""
    incoming = build_message(
        message=message,
        topic=topic,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        correlation_id=correlation_id,
        reply_to=reply_to,
    )

    return_value = None

    for handler in self.broker._subscribers.values():  # pragma: no branch
        if _is_handler_matches(handler, topic, partition):
            msg_to_send = (
                [incoming]
                if isinstance(handler, AsyncAPIBatchSubscriber)
                else incoming
            )

            with timeout_scope(rpc_timeout, raise_timeout):
                response_msg = await self._execute_handler(
                    msg_to_send, topic, handler
                )
                if rpc:
                    return_value = return_value or await self._decoder(
                        await self._parser(response_msg)
                    )

    return return_value

request async #

request(message, topic, key=None, partition=None, timestamp_ms=None, headers=None, correlation_id=None, *, timeout=0.5)
Source code in faststream/kafka/testing.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    topic: str,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    *,
    timeout: Optional[float] = 0.5,
) -> "ConsumerRecord":
    incoming = build_message(
        message=message,
        topic=topic,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        correlation_id=correlation_id,
    )

    for handler in self.broker._subscribers.values():  # pragma: no branch
        if _is_handler_matches(handler, topic, partition):
            msg_to_send = (
                [incoming]
                if isinstance(handler, AsyncAPIBatchSubscriber)
                else incoming
            )

            with anyio.fail_after(timeout):
                return await self._execute_handler(msg_to_send, topic, handler)

    raise SubscriberNotFound

publish_batch async #

publish_batch(*msgs, topic, partition=None, timestamp_ms=None, headers=None, reply_to='', correlation_id=None, no_confirm=False)

Publish a batch of messages to the Kafka broker.

Source code in faststream/kafka/testing.py
async def publish_batch(
    self,
    *msgs: "SendableMessage",
    topic: str,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    no_confirm: bool = False,
) -> None:
    """Publish a batch of messages to the Kafka broker."""
    for handler in self.broker._subscribers.values():  # pragma: no branch
        if _is_handler_matches(handler, topic, partition):
            messages = (
                build_message(
                    message=message,
                    topic=topic,
                    partition=partition,
                    timestamp_ms=timestamp_ms,
                    headers=headers,
                    correlation_id=correlation_id,
                    reply_to=reply_to,
                )
                for message in msgs
            )

            if isinstance(handler, AsyncAPIBatchSubscriber):
                await self._execute_handler(list(messages), topic, handler)

            else:
                for m in messages:
                    await self._execute_handler(m, topic, handler)
    return None