Skip to content

TestKafkaBroker

faststream.kafka.testing.TestKafkaBroker #

TestKafkaBroker(broker, with_real=False, connect_only=None)

Bases: TestBroker[KafkaBroker]

A class to test Kafka brokers.

Source code in faststream/testing/broker.py
def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
) -> None:
    self.with_real = with_real
    self.broker = broker

    if connect_only is None:
        try:
            connect_only = is_contains_context_name(
                self.__class__.__name__,
                TestApp.__name__,
            )
        except Exception:  # pragma: no cover
            warnings.warn(
                (
                    "\nError `{e!r}` occurred at `{self.__class__.__name__}` AST parsing."
                    "\n`connect_only` is set to `False` by default."
                ),
                category=RuntimeWarning,
                stacklevel=1,
            )

            connect_only = False

    self.connect_only = connect_only
    self._fake_subscribers: List[SubscriberProto[Any]] = []

with_real instance-attribute #

with_real = with_real

broker instance-attribute #

broker = broker

connect_only instance-attribute #

connect_only = connect_only

create_publisher_fake_subscriber staticmethod #

create_publisher_fake_subscriber(broker, publisher)
Source code in faststream/kafka/testing.py
@staticmethod
def create_publisher_fake_subscriber(
    broker: KafkaBroker,
    publisher: "AsyncAPIPublisher[Any]",
) -> Tuple["LogicSubscriber[Any]", bool]:
    sub: Optional[LogicSubscriber[Any]] = None
    for handler in broker._subscribers.values():
        if _is_handler_matches(handler, publisher.topic, publisher.partition):
            sub = handler
            break

    if sub is None:
        is_real = False

        if publisher.partition:
            tp = TopicPartition(
                topic=publisher.topic, partition=publisher.partition
            )
            sub = broker.subscriber(
                partitions=[tp],
                batch=isinstance(publisher, AsyncAPIBatchPublisher),
            )
        else:
            sub = broker.subscriber(
                publisher.topic,
                batch=isinstance(publisher, AsyncAPIBatchPublisher),
            )
    else:
        is_real = True

    return sub, is_real