Skip to content

TestNatsBroker

faststream.nats.test.TestNatsBroker #

TestNatsBroker(
    broker: Broker,
    with_real: bool = False,
    connect_only: bool = False,
)

Bases: TestBroker[NatsBroker]

Source code in faststream/broker/test.py
def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: bool = False,
):
    self.with_real = with_real
    self.broker = broker
    self.connect_only = connect_only

broker instance-attribute #

broker = broker

connect_only instance-attribute #

connect_only = connect_only

with_real instance-attribute #

with_real = with_real

create_publisher_fake_subscriber staticmethod #

create_publisher_fake_subscriber(
    broker: NatsBroker, publisher: Publisher
) -> HandlerCallWrapper[Any, Any, Any]
Source code in faststream/nats/test.py
@staticmethod
def create_publisher_fake_subscriber(
    broker: NatsBroker,
    publisher: Publisher,
) -> HandlerCallWrapper[Any, Any, Any]:
    @broker.subscriber(publisher.subject, _raw=True)
    def f(msg: Any) -> None:
        pass

    return f

patch_publisher staticmethod #

patch_publisher(broker: NatsBroker, publisher: Any) -> None
Source code in faststream/nats/test.py
@staticmethod
def patch_publisher(broker: NatsBroker, publisher: Any) -> None:
    publisher._producer = broker._producer

remove_publisher_fake_subscriber staticmethod #

remove_publisher_fake_subscriber(
    broker: NatsBroker, publisher: Publisher
) -> None
Source code in faststream/nats/test.py
@staticmethod
def remove_publisher_fake_subscriber(
    broker: NatsBroker, publisher: Publisher
) -> None:
    broker.handlers.pop(Handler.get_routing_hash(publisher.subject), None)

Last update: 2023-11-13