Skip to content

Publisher Object#

The Publisher Object provides a full-featured way to publish messages. It has an AsyncAPI representation and includes testability features. This method creates a reusable Publisher object.

Additionally, this object can be used as a decorator. The order of Subscriber and Publisher decorators doesn't matter, but @publisher can be used only with functions already decorated by a @broker.subscriber(...).

Note

It uses the handler function's return type annotation to cast the function's return value before sending, so be accurate with it.

Pros and Cons

AsyncAPI support - AsyncAPI is a specification for describing asynchronous APIs used in messaging applications. This method supports this standard.

Testing support - This method has full Testing support.

Broker availability from Context - You can leverage FastStream's Context, a built-in Dependency Injection (DI) container, to work with brokers or other external services.

Reusable - This method is reusable.

The message will always be published.

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

publisher = broker.publisher("another-topic")

@publisher
@broker.subscriber("test-topic")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

publisher = broker.publisher("another-topic")

@publisher
@broker.subscriber("test-topic")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

publisher = broker.publisher("another-queue")

@publisher
@broker.subscriber("test-queue")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-queue")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

publisher = broker.publisher("another-subject")

@publisher
@broker.subscriber("test-subject")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-subject")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

publisher = broker.publisher("another-channel")

@publisher
@broker.subscriber("test-channel")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-channel")
async def handle_next(msg: str):
    assert msg == "Hi!"

Message Broadcasting#

The decorator can be used multiple times with one function to broadcast the function's return:

@publisher1
@publisher2
@broker.subscriber("in")
async def handle(msg) -> str:
    return "Response"

This way, you will send a copy of your return to all output topics.

Note

Also, if this subscriber consumes a message with RPC mode, it sends a reply not only to the RPC channel but also to all publishers as well.

Details#

Additionally, @publisher automatically sends a message with the same correlation_id as the incoming message. This way, you get the same correlation_id for the entire message pipeline process across all services, allowing you to collect a trace.