Skip to content

Publisher Object#

The Publisher Object provides a full-featured way to publish messages. It has AsyncAPI representation and includes testable features. This method creates a reusable Publisher object.

It can be used as a function decorator. The order of Subscriber and Publisher decorators doesn't matter, but they can only be used with functions decorated by a subscriber decorator.

It also uses the handler function's return type annotation to cast the function's return value before sending, so be accurate with it:

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

publisher = broker.publisher("another-topic")


@publisher
@broker.subscriber("test-topic")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", topic="test-topic")
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

publisher = broker.publisher("another-queue")


@publisher
@broker.subscriber("test-queue")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-queue")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", queue="test-queue")

You can use it multiple times with one function to broadcast the function's return:

@publisher1
@publisher2
@broker.subscriber("in")
async def handle(msg) -> str:
    return "Response"

Additionally, it automatically sends a message with the same correlation_id as the incoming message. This way, you get the same correlation_id for the entire message pipeline process across all services, allowing you to collect a trace.


Last update: 2023-09-21