Skip to content

patch_broker_calls

faststream.testing.broker.patch_broker_calls #

patch_broker_calls(broker)

Patch broker calls.

Source code in faststream/testing/broker.py
def patch_broker_calls(broker: "BrokerUsecase[Any, Any]") -> None:
    """Patch broker calls."""
    broker._abc_start()

    for handler in broker._subscribers.values():
        for h in handler.calls:
            h.handler.set_test()