Skip to content

TestRabbitBroker

faststream.rabbit.TestRabbitBroker #

TestRabbitBroker(
    broker: Broker,
    with_real: bool = False,
    connect_only: bool = False,
)

Bases: TestBroker[RabbitBroker]

Source code in faststream/broker/test.py
def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: bool = False,
):
    self.with_real = with_real
    self.broker = broker
    self.connect_only = connect_only

broker instance-attribute #

broker = broker

connect_only instance-attribute #

connect_only = connect_only

with_real instance-attribute #

with_real = with_real

create_publisher_fake_subscriber staticmethod #

create_publisher_fake_subscriber(
    broker: RabbitBroker, publisher: Publisher
) -> HandlerCallWrapper[Any, Any, Any]
Source code in faststream/rabbit/test.py
@staticmethod
def create_publisher_fake_subscriber(
    broker: RabbitBroker,
    publisher: Publisher,
) -> HandlerCallWrapper[Any, Any, Any]:
    @broker.subscriber(
        queue=publisher.queue,
        exchange=publisher.exchange,
        _raw=True,
    )
    def f(msg: Any) -> None:
        pass

    return f

patch_publisher staticmethod #

patch_publisher(
    broker: RabbitBroker, publisher: Any
) -> None
Source code in faststream/rabbit/test.py
@staticmethod
def patch_publisher(broker: RabbitBroker, publisher: Any) -> None:
    publisher._producer = broker._producer

remove_publisher_fake_subscriber staticmethod #

remove_publisher_fake_subscriber(
    broker: RabbitBroker, publisher: Publisher
) -> None
Source code in faststream/rabbit/test.py
@staticmethod
def remove_publisher_fake_subscriber(
    broker: RabbitBroker,
    publisher: Publisher,
) -> None:
    broker.handlers.pop(
        publisher._get_routing_hash(),
        None,
    )

Last update: 2023-11-13