Skip to content

Publisher Testing#

If you are working with a Publisher object (either decorator or direct), you can check outgoing messages as well. There are several testing features available:

  • In-memory TestClient
  • Publishing (including error handling)
  • Checking the incoming message body
  • Note about mock clearing after the context exits

Base application#

1
2
3
4
5
6
publisher = broker.publisher("another-topic")


@publisher
@broker.subscriber("test-topic")
async def handle() -> str:
1
2
3
4
5
6
publisher = broker.publisher("another-queue")


@publisher
@broker.subscriber("test-queue")
async def handle() -> str:
1
2
3
4
5
publisher = broker.publisher("another-topic")


@broker.subscriber("test-topic")
async def handle():
1
2
3
4
5
publisher = broker.publisher("another-queue")


@broker.subscriber("test-queue")
async def handle():

Testing#

import pytest

from faststream.kafka import TestKafkaBroker



@pytest.mark.asyncio
async def test_handle():
    async with TestKafkaBroker(broker) as br:
        await br.publish("", topic="test-topic")
import pytest

from faststream.rabbit import TestRabbitBroker



@pytest.mark.asyncio
async def test_handle():
    async with TestRabbitBroker(broker) as br:
        await br.publish("", queue="test-queue")
  • Testing with a real broker
  • Waiting for the consumer to be called

Last update: 2023-09-21