Skip to content

Broker Publishing#

The easiest way to publish a message is to use a Broker, which allows you to use it as a publisher client in any applications.

In the FastStream project, this call is not represented in the AsyncAPI scheme. You can use it to send rarely-publishing messages, such as startup or shutdown events.

Pros and Cons

Easy to use - Publishing messages in FastStream is intuitive and requires minimal effort.

Broker availability from Context - You can leverage FastStream's Context, a built-in Dependency Injection (DI) container, to work with brokers or other external services.

No AsyncAPI support - AsyncAPI is a specification for describing asynchronous APIs used in messaging applications. This method currently does not support this standard.

No testing support - This method lacks full Testing support.

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
async def handle():
    await broker.publish("Hi!", topic="another-topic")


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", topic="test-topic")
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
async def handle():
    await broker.publish("Hi!", topic="another-topic")


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", topic="test-topic")
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test-queue")
async def handle():
    await broker.publish("Hi!", queue="another-queue")


@broker.subscriber("another-queue")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", queue="test-queue")
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test-subject")
async def handle():
    await broker.publish("Hi!", subject="another-subject")


@broker.subscriber("another-subject")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", subject="test-subject")
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("test-channel")
async def handle():
    await broker.publish("Hi!", channel="another-channel")


@broker.subscriber("another-channel")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", channel="test-channel")