importpytestfromfaststreamimportFastStream,TestAppfromfaststream.kafkaimportKafkaBroker,TestKafkaBrokerapp=FastStream(KafkaBroker())@app.after_startupasyncdefhandle():print("Calls in tests too!")@pytest.mark.asyncioasyncdeftest_lifespan():asyncwith(TestKafkaBroker(app.broker,connect_only=True),TestApp(app),):# test somethingpass
importpytestfromfaststreamimportFastStream,TestAppfromfaststream.confluentimportKafkaBroker,TestKafkaBrokerapp=FastStream(KafkaBroker())@app.after_startupasyncdefhandle():print("Calls in tests too!")@pytest.mark.asyncioasyncdeftest_lifespan():asyncwith(TestKafkaBroker(app.broker,connect_only=True),TestApp(app),):# test somethingpass
importpytestfromfaststreamimportFastStream,TestAppfromfaststream.rabbitimportRabbitBroker,TestRabbitBrokerapp=FastStream(RabbitBroker())@app.after_startupasyncdefhandle():print("Calls in tests too!")@pytest.mark.asyncioasyncdeftest_lifespan():asyncwith(TestRabbitBroker(app.broker,connect_only=True),TestApp(app),):# test somethingpass
importpytestfromfaststreamimportFastStream,TestAppfromfaststream.natsimportNatsBroker,TestNatsBrokerapp=FastStream(NatsBroker())@app.after_startupasyncdefhandle():print("Calls in tests too!")@pytest.mark.asyncioasyncdeftest_lifespan():asyncwith(TestNatsBroker(app.broker,connect_only=True),TestApp(app),):# test somethingpass
importpytestfromfaststreamimportFastStream,TestAppfromfaststream.redisimportRedisBroker,TestRedisBrokerapp=FastStream(RedisBroker())@app.after_startupasyncdefhandle():print("Calls in tests too!")@pytest.mark.asyncioasyncdeftest_lifespan():asyncwith(TestRedisBroker(app.broker,connect_only=True),TestApp(app),):# test somethingpass
If you want to use In-Memory patched broker in your tests, it's advised to patch the broker first (before applying the application patch).
Also, TestApp and TestBroker are both calling broker.start(). According to the original logic, broker should be started in the FastStream application, but if TestBroker is applied first – it breaks this behavior. For this reason, TestApp prevents TestBrokerbroker.start() call if it is placed inside the TestBroker context.
This behavior is controlled by connect_only argument to TestBroker. While None is the default value, TestApp can set it to True or False during the code execution. If connect_only argument is provided manually, it would not be changed.
Warning
With connect_only=False, all FastStream hooks will be called after the broker start, which can break some @app.on_startup logic.