Skip to content

Publisher Direct Usage#

The Publisher Object provides a full-featured way to publish messages. It has an AsyncAPI representation and includes testability features.

This method creates a reusable Publisher object that can be used directly to publish a message:

Pros and Cons

AsyncAPI support - AsyncAPI is a specification for describing asynchronous APIs used in messaging applications. This method currently does not support this standard.

Testing support - This method has full Testing support.

Broker availability from Context - You can leverage FastStream's Context, a built-in Dependency Injection (DI) container, to work with brokers or other external services.

Optional publication - You can create optional publications.

Reusable - This method is reusable.

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

publisher = broker.publisher("another-topic")

@broker.subscriber("test-topic")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

publisher = broker.publisher("another-topic")

@broker.subscriber("test-topic")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

publisher = broker.publisher("another-queue")

@broker.subscriber("test-queue")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-queue")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

publisher = broker.publisher("another-subject")

@broker.subscriber("test-subject")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-subject")
async def handle_next(msg: str):
    assert msg == "Hi!"
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

publisher = broker.publisher("another-channel")

@broker.subscriber("test-channel")
async def handle():
    await publisher.publish("Hi!")


@broker.subscriber("another-channel")
async def handle_next(msg: str):
    assert msg == "Hi!"

It is something in the middle between broker publish and object decorator. It has an AsyncAPI representation and testability features (like the object decorator), but allows you to send different messages to different outputs (like the broker publish).

@broker.subscriber("in")
async def handle(msg) -> str:
    await publisher1.publish("Response-1")
    await publisher2.publish("Response-2")

Note

When using this method, FastStream doesn't reuse the incoming correlation_id to mark outgoing messages with it. You should set it manually if it is required.