Broker Publishing The easiest way to publish a message is to use a Broker, which allows you to use it as a publisher client in any applications.
In the FastStream project, this call is not represented in the AsyncAPI scheme. You can use it to send rarely-publishing messages, such as startup or shutdown events.
AIOKafka Confluent RabbitMQ NATS Redis
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle ():
await broker . publish ( "Hi!" , topic = "another-topic" )
@broker . subscriber ( "another-topic" )
async def handle_next ( msg : str ):
assert msg == "Hi!"
@app . after_startup
async def test ():
await broker . publish ( "" , topic = "test-topic" )
from faststream import FastStream
from faststream.confluent import KafkaBroker
broker = KafkaBroker ( "localhost:9092" )
app = FastStream ( broker )
@broker . subscriber ( "test-topic" )
async def handle ():
await broker . publish ( "Hi!" , topic = "another-topic" )
@broker . subscriber ( "another-topic" )
async def handle_next ( msg : str ):
assert msg == "Hi!"
@app . after_startup
async def test ():
await broker . publish ( "" , topic = "test-topic" )
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker ( "amqp://guest:guest@localhost:5672/" )
app = FastStream ( broker )
@broker . subscriber ( "test-queue" )
async def handle ():
await broker . publish ( "Hi!" , queue = "another-queue" )
@broker . subscriber ( "another-queue" )
async def handle_next ( msg : str ):
assert msg == "Hi!"
@app . after_startup
async def test ():
await broker . publish ( "" , queue = "test-queue" )
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker ( "nats://localhost:4222" )
app = FastStream ( broker )
@broker . subscriber ( "test-subject" )
async def handle ():
await broker . publish ( "Hi!" , subject = "another-subject" )
@broker . subscriber ( "another-subject" )
async def handle_next ( msg : str ):
assert msg == "Hi!"
@app . after_startup
async def test ():
await broker . publish ( "" , subject = "test-subject" )
from faststream import FastStream
from faststream.redis import RedisBroker
broker = RedisBroker ( "redis://localhost:6379" )
app = FastStream ( broker )
@broker . subscriber ( "test-channel" )
async def handle ():
await broker . publish ( "Hi!" , channel = "another-channel" )
@broker . subscriber ( "another-channel" )
async def handle_next ( msg : str ):
assert msg == "Hi!"
@app . after_startup
async def test ():
await broker . publish ( "" , channel = "test-channel" )