Skip to content

Publisher Decorator#

The second easiest way to publish messages is by using the Publisher Decorator. This method has an AsyncAPI representation and is suitable for quickly creating applications. However, it doesn't provide all testing features.

It creates a structured DataPipeline unit with an input and output. The order of Subscriber and Publisher decorators doesn't matter, but @broker.publisher(...) can be used only with functions already decorated by a @broker.subscriber(...).

Note

It uses the handler function's return type annotation to cast the function's return value before sending, so be accurate with it.

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
@broker.publisher("another-topic")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", topic="test-topic")
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
@broker.publisher("another-topic")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", topic="test-topic")
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test-queue")
@broker.publisher("another-queue")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-queue")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", queue="test-queue")
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test-subject")
@broker.publisher("another-subject")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-subject")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", subject="test-subject")
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("test-channel")
@broker.publisher("another-channel")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-channel")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", channel="test-channel")

Message Broadcasting#

The decorator can be used multiple times with one function to broadcast the function's return:

@broker.subscriber("in")
@broker.publisher("first-out")
@broker.publisher("second-out")
async def handle(msg) -> str:
    return "Response"

This way you will send a copy of your return to the all output topics.

Note

Also, if this subscriber consumes a message with RPC mode, it sends a reply not only to the RPC channel but also to all publishers as well.

Details#

Additionally, @broker.publisher(...) automatically sends a message with the same correlation_id as the incoming message. This way, you get the same correlation_id for the entire message pipeline process across all services, allowing you to collect a trace.