Skip to content

TestBroker

faststream.broker.test.TestBroker #

TestBroker(
    broker: Broker,
    with_real: bool = False,
    connect_only: bool = False,
)

Bases: Generic[Broker]

Source code in faststream/broker/test.py
def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: bool = False,
):
    self.with_real = with_real
    self.broker = broker
    self.connect_only = connect_only

broker instance-attribute #

broker = broker

connect_only instance-attribute #

connect_only = connect_only

with_real instance-attribute #

with_real = with_real

create_publisher_fake_subscriber abstractmethod staticmethod #

create_publisher_fake_subscriber(
    broker: Broker, publisher: Any
) -> HandlerCallWrapper[Any, Any, Any]
Source code in faststream/broker/test.py
@staticmethod
@abstractmethod
def create_publisher_fake_subscriber(
    broker: Broker, publisher: Any
) -> HandlerCallWrapper[Any, Any, Any]:
    raise NotImplementedError()

patch_publisher abstractmethod staticmethod #

patch_publisher(broker: Broker, publisher: Any) -> None
Source code in faststream/broker/test.py
@staticmethod
@abstractmethod
def patch_publisher(broker: Broker, publisher: Any) -> None:
    raise NotImplementedError()

remove_publisher_fake_subscriber abstractmethod staticmethod #

remove_publisher_fake_subscriber(
    broker: Broker, publisher: Any
) -> None
Source code in faststream/broker/test.py
@staticmethod
@abstractmethod
def remove_publisher_fake_subscriber(broker: Broker, publisher: Any) -> None:
    raise NotImplementedError()

Last update: 2023-11-13