Skip to content

TestRabbitBroker

faststream.rabbit.TestRabbitBroker #

TestRabbitBroker(
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
)

Bases: TestBroker[RabbitBroker]

A class to test RabbitMQ brokers.

Initialize a class instance.

PARAMETER DESCRIPTION
broker

An instance of the Broker class.

TYPE: Broker

with_real

Whether to use a real broker.

TYPE: bool DEFAULT: False

connect_only

Whether to only connect to the broker.

TYPE: Optional[bool] DEFAULT: None

Source code in faststream/broker/test.py
def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
) -> None:
    """Initialize a class instance.

    Args:
        broker: An instance of the Broker class.
        with_real: Whether to use a real broker.
        connect_only: Whether to only connect to the broker.

    """
    self.with_real = with_real
    self.broker = broker

    if connect_only is None:
        try:
            connect_only = is_contains_context_name(
                self.__class__.__name__,
                TestApp.__name__,
            )

        except Exception as e:  # pragma: no cover
            # TODO: remove with 0.5.0
            warnings.warn(
                (
                    f"\nError `{e!r}` occurred at `{self.__class__.__name__}` AST parsing"
                    "\nPlease, report us by creating an Issue with your TestClient use case"
                    "\nhttps://github.com/airtai/faststream/issues/new?labels=bug&template=bug_report.md&title=Bug:%20TestClient%20AST%20parsing"
                ),
                category=RuntimeWarning,
                stacklevel=1,
            )

            connect_only = False

    self.connect_only = connect_only

broker instance-attribute #

broker = broker

connect_only instance-attribute #

connect_only = connect_only

with_real instance-attribute #

with_real = with_real

create_publisher_fake_subscriber staticmethod #

create_publisher_fake_subscriber(
    broker: RabbitBroker, publisher: Publisher
) -> HandlerCallWrapper[Any, Any, Any]
Source code in faststream/rabbit/test.py
@staticmethod
def create_publisher_fake_subscriber(
    broker: RabbitBroker,
    publisher: Publisher,
) -> HandlerCallWrapper[Any, Any, Any]:
    @broker.subscriber(
        queue=publisher.queue,
        exchange=publisher.exchange,
        _raw=True,
    )
    def f(msg: Any) -> None:
        pass

    return f

patch_publisher staticmethod #

patch_publisher(
    broker: RabbitBroker, publisher: Any
) -> None
Source code in faststream/rabbit/test.py
@staticmethod
def patch_publisher(broker: RabbitBroker, publisher: Any) -> None:
    publisher._producer = broker._producer

remove_publisher_fake_subscriber staticmethod #

remove_publisher_fake_subscriber(
    broker: RabbitBroker, publisher: Publisher
) -> None
Source code in faststream/rabbit/test.py
@staticmethod
def remove_publisher_fake_subscriber(
    broker: RabbitBroker,
    publisher: Publisher,
) -> None:
    broker.handlers.pop(
        publisher._get_routing_hash(),
        None,
    )