Skip to content

FakeProducer

faststream.kafka.test.FakeProducer #

FakeProducer(broker: KafkaBroker)

Bases: AioKafkaFastProducer

A fake Kafka producer for testing purposes.

This class extends AioKafkaFastProducer and is used to simulate Kafka message publishing during tests.

Initialize the FakeProducer.

PARAMETER DESCRIPTION
broker

The KafkaBroker instance to associate with this FakeProducer.

TYPE: KafkaBroker

Source code in faststream/kafka/test.py
def __init__(self, broker: KafkaBroker) -> None:
    """Initialize the FakeProducer.

    Args:
        broker (KafkaBroker): The KafkaBroker instance to associate with this FakeProducer.
    """
    self.broker = broker

broker instance-attribute #

broker = broker

publish async #

publish(message: SendableMessage, topic: str, key: Optional[bytes] = None, partition: Optional[int] = None, timestamp_ms: Optional[int] = None, headers: Optional[Dict[str, str]] = None, correlation_id: Optional[str] = None, *, reply_to: str = '', rpc: bool = False, rpc_timeout: Optional[float] = None, raise_timeout: bool = False) -> Optional[SendableMessage]

Publish a message to the Kafka broker.

PARAMETER DESCRIPTION
message

The message to be published.

TYPE: SendableMessage

topic

The Kafka topic to publish the message to.

TYPE: str

key

The message key. Defaults to None.

TYPE: Optional[bytes] DEFAULT: None

partition

The Kafka partition to use. Defaults to None.

TYPE: Optional[int] DEFAULT: None

timestamp_ms

The message timestamp in milliseconds. Defaults to None.

TYPE: Optional[int] DEFAULT: None

headers

Additional headers for the message. Defaults to None.

TYPE: Optional[Dict[str, str]] DEFAULT: None

correlation_id

The correlation ID for the message. Defaults to None.

TYPE: Optional[str] DEFAULT: None

reply_to

The topic to which responses should be sent. Defaults to "".

TYPE: str DEFAULT: ''

rpc

If True, treat the message as an RPC request. Defaults to False.

TYPE: bool DEFAULT: False

rpc_timeout

Timeout for RPC requests. Defaults to None.

TYPE: Optional[float] DEFAULT: None

raise_timeout

If True, raise an exception on timeout. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Optional[SendableMessage]

Optional[SendableMessage]: The response message, if this was an RPC request, otherwise None.

Source code in faststream/kafka/test.py
@override
async def publish(  # type: ignore[override]
    self,
    message: SendableMessage,
    topic: str,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    *,
    reply_to: str = "",
    rpc: bool = False,
    rpc_timeout: Optional[float] = None,
    raise_timeout: bool = False,
) -> Optional[SendableMessage]:
    """Publish a message to the Kafka broker.

    Args:
        message (SendableMessage): The message to be published.
        topic (str): The Kafka topic to publish the message to.
        key (Optional[bytes], optional): The message key. Defaults to None.
        partition (Optional[int], optional): The Kafka partition to use. Defaults to None.
        timestamp_ms (Optional[int], optional): The message timestamp in milliseconds. Defaults to None.
        headers (Optional[Dict[str, str]], optional): Additional headers for the message. Defaults to None.
        correlation_id (Optional[str], optional): The correlation ID for the message. Defaults to None.
        reply_to (str, optional): The topic to which responses should be sent. Defaults to "".
        rpc (bool, optional): If True, treat the message as an RPC request. Defaults to False.
        rpc_timeout (Optional[float], optional): Timeout for RPC requests. Defaults to None.
        raise_timeout (bool, optional): If True, raise an exception on timeout. Defaults to False.

    Returns:
        Optional[SendableMessage]: The response message, if this was an RPC request, otherwise None.
    """
    incoming = build_message(
        message=message,
        topic=topic,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        correlation_id=correlation_id,
        reply_to=reply_to,
    )

    for handler in self.broker.handlers.values():  # pragma: no branch
        if topic in handler.topics:
            return await call_handler(
                handler=handler,
                message=[incoming] if handler.batch else incoming,
                rpc=rpc,
                rpc_timeout=rpc_timeout,
                raise_timeout=raise_timeout,
            )

    return None

publish_batch async #

publish_batch(*msgs: SendableMessage, topic: str, partition: Optional[int] = None, timestamp_ms: Optional[int] = None, headers: Optional[Dict[str, str]] = None) -> None

Publish a batch of messages to the Kafka broker.

PARAMETER DESCRIPTION
*msgs

Variable number of messages to be published.

TYPE: SendableMessage DEFAULT: ()

topic

The Kafka topic to publish the messages to.

TYPE: str

partition

The Kafka partition to use. Defaults to None.

TYPE: Optional[int] DEFAULT: None

timestamp_ms

The message timestamp in milliseconds. Defaults to None.

TYPE: Optional[int] DEFAULT: None

headers

Additional headers for the messages. Defaults to None.

TYPE: Optional[Dict[str, str]] DEFAULT: None

RETURNS DESCRIPTION
None

This method does not return a value.

TYPE: None

Source code in faststream/kafka/test.py
async def publish_batch(
    self,
    *msgs: SendableMessage,
    topic: str,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
) -> None:
    """Publish a batch of messages to the Kafka broker.

    Args:
        *msgs (SendableMessage): Variable number of messages to be published.
        topic (str): The Kafka topic to publish the messages to.
        partition (Optional[int], optional): The Kafka partition to use. Defaults to None.
        timestamp_ms (Optional[int], optional): The message timestamp in milliseconds. Defaults to None.
        headers (Optional[Dict[str, str]], optional): Additional headers for the messages. Defaults to None.

    Returns:
        None: This method does not return a value.
    """
    for handler in self.broker.handlers.values():  # pragma: no branch
        if topic in handler.topics:
            await call_handler(
                handler=handler,
                message=[
                    build_message(
                        message=message,
                        topic=topic,
                        partition=partition,
                        timestamp_ms=timestamp_ms,
                        headers=headers,
                    )
                    for message in msgs
                ],
            )

    return None

stop async #

stop() -> None
Source code in faststream/kafka/producer.py
async def stop(self) -> None:
    if self._producer is not None:  # pragma: no branch
        await self._producer.stop()