Skip to content

Publisher Direct Usage#

The Publisher Object provides a full-featured way to publish messages. It has an AsyncAPI representation and includes testability features.

This method creates a reusable Publisher object that can be used directly to publish a message:

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

publisher = broker.publisher("another-topic")

@broker.subscriber("test-topic")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

publisher = broker.publisher("another-topic")

@broker.subscriber("test-topic")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

publisher = broker.publisher("another-queue")

@broker.subscriber("test-queue")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-queue")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

publisher = broker.publisher("another-subject")

@broker.subscriber("test-subject")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-subject")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

publisher = broker.publisher("another-channel")

@broker.subscriber("test-channel")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-channel")
async def handle_next(msg: str):
    assert msg == "Hi!"

It is something in the middle between broker publish and object decorator. It has an AsyncAPI representation and testability features (like the object decorator), but allows you to send different messages to different outputs (like the broker publish).

@broker.subscriber("in")
async def handle(msg) -> str:
    await publisher1.publish("Response-1")
    await publisher2.publish("Response-2")

Note

When using this method, FastStream doesn't reuse the incoming correlation_id to mark outgoing messages with it. You should set it manually if it is required.