Skip to content

Events Testing#

In the most cases you are testing your subsriber/publisher functions, but sometimes you need to trigger some lifespan hooks in your tests too.

For this reason, FastStream has a special TestApp patcher working as a regular async context manager.

import pytest

from faststream import FastStream, TestApp
from faststream.kafka import KafkaBroker, TestKafkaBroker

app = FastStream(KafkaBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with TestKafkaBroker(app.broker):
        async with TestApp(app):
            # test something
            pass
import pytest

from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker

app = FastStream(RabbitBroker())


@app.after_startup
async def handle():
    print("Calls in tests too!")


@pytest.mark.asyncio
async def test_lifespan():
    async with TestRabbitBroker(app.broker):
        async with TestApp(app):
            # test something
            pass

Tip

If you are using a connected broker inside withing your lifespan hooks, it's advisable to patch the broker first (before applying the application patch).


Last update: 2023-09-21