Skip to content

Publisher Object#

The Publisher Object provides a full-featured way to publish messages. It has an AsyncAPI representation and includes testability features. This method creates a reusable Publisher object.

Additionally, this object can be used as a decorator. The order of Subscriber and Publisher decorators doesn't matter, but @publisher can be used only with functions already decorated by a @broker.subscriber(...).

Note

It uses the handler function's return type annotation to cast the function's return value before sending, so be accurate with it.

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

publisher = broker.publisher("another-topic")

@publisher
@broker.subscriber("test-topic")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

publisher = broker.publisher("another-topic")

@publisher
@broker.subscriber("test-topic")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

publisher = broker.publisher("another-queue")

@publisher
@broker.subscriber("test-queue")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-queue")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

publisher = broker.publisher("another-subject")

@publisher
@broker.subscriber("test-subject")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-subject")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

publisher = broker.publisher("another-channel")

@publisher
@broker.subscriber("test-channel")
async def handle() -> str:
    return "Hi!"


@broker.subscriber("another-channel")
async def handle_next(msg: str):
    assert msg == "Hi!"

Message Broadcasting#

The decorator can be used multiple times with one function to broadcast the function's return:

@publisher1
@publisher2
@broker.subscriber("in")
async def handle(msg) -> str:
    return "Response"

This way, you will send a copy of your return to all output topics.

Note

Also, if this subscriber consumes a message with RPC mode, it sends a reply not only to the RPC channel but also to all publishers as well.

Details#

Additionally, @publisher automatically sends a message with the same correlation_id as the incoming message. This way, you get the same correlation_id for the entire message pipeline process across all services, allowing you to collect a trace.