Skip to content

Events Testing#

In the most cases you are testing your subscriber/publisher functions, but sometimes you need to trigger some lifespan hooks in your tests too.

For this reason, FastStream has a special TestApp patcher working as a regular async context manager.

import pytest

from faststream import FastStream, TestApp
from faststream.kafka import KafkaBroker, TestKafkaBroker

app = FastStream(KafkaBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestKafkaBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass
import pytest

from faststream import FastStream, TestApp
from faststream.confluent import KafkaBroker, TestKafkaBroker

app = FastStream(KafkaBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestKafkaBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass
import pytest

from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker

app = FastStream(RabbitBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestRabbitBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass
import pytest

from faststream import FastStream, TestApp
from faststream.nats import NatsBroker, TestNatsBroker

app = FastStream(NatsBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestNatsBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass
import pytest

from faststream import FastStream, TestApp
from faststream.redis import RedisBroker, TestRedisBroker

app = FastStream(RedisBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestRedisBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass

Using with TestBroker#

If you want to use In-Memory patched broker in your tests, it's advised to patch the broker first (before applying the application patch).

Also, TestApp and TestBroker are both calling broker.start(). According to the original logic, broker should be started in the FastStream application, but if TestBroker is applied first – it breaks this behavior. For this reason, TestApp prevents TestBroker broker.start() call if it is placed inside the TestBroker context.

This behavior is controlled by connect_only argument to TestBroker. While None is the default value, TestApp can set it to True or False during the code execution. If connect_only argument is provided manually, it would not be changed.

Warning

With connect_only=False, all FastStream hooks will be called after the broker start, which can break some @app.on_startup logic.