Skip to content

Publisher Testing#

If you are working with a Publisher object (either as a decorator or directly), you have several testing features available:

  • In-memory TestClient
  • Publishing locally with error propagation
  • Checking the incoming message body

Base Application#

Let's take a look at a simple application example with a publisher as a decorator or as a direct call:

1
2
3
4
5
6
publisher = broker.publisher("another-topic")

@publisher
@broker.subscriber("test-topic")
async def handle() -> str:
    return "Hi!"
1
2
3
4
5
6
publisher = broker.publisher("another-topic")

@publisher
@broker.subscriber("test-topic")
async def handle() -> str:
    return "Hi!"
1
2
3
4
5
6
publisher = broker.publisher("another-queue")

@publisher
@broker.subscriber("test-queue")
async def handle() -> str:
    return "Hi!"
1
2
3
4
5
6
publisher = broker.publisher("another-subject")

@publisher
@broker.subscriber("test-subject")
async def handle() -> str:
    return "Hi!"
1
2
3
4
5
6
publisher = broker.publisher("another-channel")

@publisher
@broker.subscriber("test-channel")
async def handle() -> str:
    return "Hi!"
1
2
3
4
5
publisher = broker.publisher("another-topic")

@broker.subscriber("test-topic")
async def handle():
    await publisher.publish("Hi!")
1
2
3
4
5
publisher = broker.publisher("another-topic")

@broker.subscriber("test-topic")
async def handle():
    await publisher.publish("Hi!")
1
2
3
4
5
publisher = broker.publisher("another-queue")

@broker.subscriber("test-queue")
async def handle():
    await publisher.publish("Hi!")
1
2
3
4
5
publisher = broker.publisher("another-subject")

@broker.subscriber("test-subject")
async def handle():
    await publisher.publish("Hi!")
1
2
3
4
5
publisher = broker.publisher("another-channel")

@broker.subscriber("test-channel")
async def handle():
    await publisher.publish("Hi!")

Testing#

To test it, you just need to patch your broker with a special TestBroker.

1
2
3
4
5
6
7
8
import pytest

from faststream.kafka import TestKafkaBroker

@pytest.mark.asyncio
async def test_handle():
    async with TestKafkaBroker(broker) as br:
        await br.publish("", topic="test-topic")
1
2
3
4
5
6
7
8
import pytest

from faststream.confluent import TestKafkaBroker

@pytest.mark.asyncio
async def test_handle():
    async with TestKafkaBroker(broker) as br:
        await br.publish("", topic="test-topic")
1
2
3
4
5
6
7
8
import pytest

from faststream.rabbit import TestRabbitBroker

@pytest.mark.asyncio
async def test_handle():
    async with TestRabbitBroker(broker) as br:
        await br.publish("", queue="test-queue")
1
2
3
4
5
6
7
8
import pytest

from faststream.nats import TestNatsBroker

@pytest.mark.asyncio
async def test_handle():
    async with TestNatsBroker(broker) as br:
        await br.publish("", subject="test-subject")
1
2
3
4
5
6
7
8
import pytest

from faststream.redis import TestRedisBroker

@pytest.mark.asyncio
async def test_handle():
    async with TestRedisBroker(broker) as br:
        await br.publish("", channel="test-channel")

By default, it patches you broker to run In-Memory, so you can use it without any external broker. It should be extremely useful in your CI or local development environment.

Also, it allows you to check the outgoing message body in the same way as with a subscriber.

publisher.mock.assert_called_once_with("Hi!")

Note

The Publisher mock contains not just a publish method input value. It sets up a virtual consumer for an outgoing topic, consumes a message, and stores this consumed one.

Additionally, TestBroker can be used with a real external broker to make your tests end-to-end suitable. For more information, please visit the subscriber testing page.