Skip to content

Broker Publishing#

The easiest way to publish a message is to use a Broker, which allows you to use it as a publisher client in any applications.

In the FastStream project, this call is not represented in the AsyncAPI scheme. You can use it to send rarely-publishing messages, such as startup or shutdown events.

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
async def handle():
    await broker.publish("Hi!", topic="another-topic")


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", topic="test-topic")
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("test-topic")
async def handle():
    await broker.publish("Hi!", topic="another-topic")


@broker.subscriber("another-topic")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", topic="test-topic")
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)


@broker.subscriber("test-queue")
async def handle():
    await broker.publish("Hi!", queue="another-queue")


@broker.subscriber("another-queue")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", queue="test-queue")
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test-subject")
async def handle():
    await broker.publish("Hi!", subject="another-subject")


@broker.subscriber("another-subject")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", subject="test-subject")
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("test-channel")
async def handle():
    await broker.publish("Hi!", channel="another-channel")


@broker.subscriber("another-channel")
async def handle_next(msg: str):
    assert msg == "Hi!"


@app.after_startup
async def test():
    await broker.publish("", channel="test-channel")