TestNatsBroker(
broker: Broker,
with_real: bool = False,
connect_only: bool = False,
)
Bases: TestBroker[NatsBroker]
Source code in faststream/broker/test.py
| def __init__(
self,
broker: Broker,
with_real: bool = False,
connect_only: bool = False,
):
self.with_real = with_real
self.broker = broker
self.connect_only = connect_only
|
broker instance-attribute
connect_only instance-attribute
connect_only = connect_only
with_real instance-attribute
create_publisher_fake_subscriber staticmethod
create_publisher_fake_subscriber(
broker: NatsBroker, publisher: Publisher
) -> HandlerCallWrapper[Any, Any, Any]
Source code in faststream/nats/test.py
| @staticmethod
def create_publisher_fake_subscriber(
broker: NatsBroker,
publisher: Publisher,
) -> HandlerCallWrapper[Any, Any, Any]:
@broker.subscriber(publisher.subject, _raw=True)
def f(msg: Any) -> None:
pass
return f
|
patch_publisher staticmethod
patch_publisher(broker: NatsBroker, publisher: Any) -> None
Source code in faststream/nats/test.py
| @staticmethod
def patch_publisher(broker: NatsBroker, publisher: Any) -> None:
publisher._producer = broker._producer
|
remove_publisher_fake_subscriber staticmethod
remove_publisher_fake_subscriber(
broker: NatsBroker, publisher: Publisher
) -> None
Source code in faststream/nats/test.py
| @staticmethod
def remove_publisher_fake_subscriber(
broker: NatsBroker, publisher: Publisher
) -> None:
broker.handlers.pop(Handler.get_routing_hash(publisher.subject), None)
|