Skip to content

TestKafkaBroker

faststream.kafka.test.TestKafkaBroker #

TestKafkaBroker(
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
)

Bases: TestBroker[KafkaBroker]

A class to test Kafka brokers.

Initialize a class instance.

PARAMETER DESCRIPTION
broker

An instance of the Broker class.

TYPE: Broker

with_real

Whether to use a real broker.

TYPE: bool DEFAULT: False

connect_only

Whether to only connect to the broker.

TYPE: Optional[bool] DEFAULT: None

Source code in faststream/broker/test.py
def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
) -> None:
    """Initialize a class instance.

    Args:
        broker: An instance of the Broker class.
        with_real: Whether to use a real broker.
        connect_only: Whether to only connect to the broker.

    """
    self.with_real = with_real
    self.broker = broker

    if connect_only is None:
        try:
            connect_only = is_contains_context_name(
                self.__class__.__name__,
                TestApp.__name__,
            )

        except Exception as e:  # pragma: no cover
            # TODO: remove with 0.5.0
            warnings.warn(
                (
                    f"\nError `{e!r}` occurred at `{self.__class__.__name__}` AST parsing"
                    "\nPlease, report us by creating an Issue with your TestClient use case"
                    "\nhttps://github.com/airtai/faststream/issues/new?labels=bug&template=bug_report.md&title=Bug:%20TestClient%20AST%20parsing"
                ),
                category=RuntimeWarning,
                stacklevel=1,
            )

            connect_only = False

    self.connect_only = connect_only

broker instance-attribute #

broker = broker

connect_only instance-attribute #

connect_only = connect_only

with_real instance-attribute #

with_real = with_real

create_publisher_fake_subscriber staticmethod #

create_publisher_fake_subscriber(
    broker: KafkaBroker, publisher: Publisher
) -> HandlerCallWrapper[Any, Any, Any]
Source code in faststream/kafka/test.py
@staticmethod
def create_publisher_fake_subscriber(
    broker: KafkaBroker,
    publisher: Publisher,
) -> HandlerCallWrapper[Any, Any, Any]:
    @broker.subscriber(  # type: ignore[call-overload,misc]
        publisher.topic,
        batch=publisher.batch,
        _raw=True,
    )
    def f(msg: Any) -> None:
        pass

    return f  # type: ignore[no-any-return]

patch_publisher staticmethod #

patch_publisher(
    broker: KafkaBroker, publisher: Any
) -> None
Source code in faststream/kafka/test.py
@staticmethod
def patch_publisher(broker: KafkaBroker, publisher: Any) -> None:
    publisher._producer = broker._producer

remove_publisher_fake_subscriber staticmethod #

remove_publisher_fake_subscriber(
    broker: KafkaBroker, publisher: Publisher
) -> None
Source code in faststream/kafka/test.py
@staticmethod
def remove_publisher_fake_subscriber(
    broker: KafkaBroker, publisher: Publisher
) -> None:
    broker.handlers.pop(publisher.topic, None)