Skip to content

Events Testing#

In the most cases you are testing your subscriber/publisher functions, but sometimes you need to trigger some lifespan hooks in your tests too.

For this reason, FastStream has a special TestApp patcher working as a regular async context manager.

import pytest

from faststream import FastStream, TestApp
from faststream.kafka import KafkaBroker, TestKafkaBroker

app = FastStream(KafkaBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestKafkaBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass
import pytest

from faststream import FastStream, TestApp
from faststream.confluent import KafkaBroker, TestKafkaBroker

app = FastStream(KafkaBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestKafkaBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass
import pytest

from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker

app = FastStream(RabbitBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestRabbitBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass
import pytest

from faststream import FastStream, TestApp
from faststream.nats import NatsBroker, TestNatsBroker

app = FastStream(NatsBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestNatsBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass
import pytest

from faststream import FastStream, TestApp
from faststream.redis import RedisBroker, TestRedisBroker

app = FastStream(RedisBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with (
        TestRedisBroker(app.broker, connect_only=True),
        TestApp(app),
    ):
        # test something
        pass

Using with TestBroker#

If you want to use In-Memory patched broker in your tests, it's advisable to patch the broker first (before applying the application patch).

Also, TestApp and TestBroker are calling broker.start() both. According to the original logic, broker should be started in the FastStream application, but TestBroker applied first breaks this behavior. This reason TestApp prevents TestBroker broker.start() call if it placed inside TestBroker context.

This behavior is ruled by connect_only TestBroker argument. By default it has None value, but TestApp can set it to True/False by inner logic. To prevent this "magic", just setup connect_only argument manually.

Warning

With connect_only=False, all FastStream hooks will be called after broker was started, what can breaks some @app.on_startup logic.