Kafka RPC Requests Unfortunately, Kafka has no built-in RPC mechanism or zero-cost topics, but you can emulate such behavior using a messaging pattern.
To implement this, you should create a persistent topic to consume the response stream and match responses with requests using the correlation ID.
This can be easily implemented with FastStream , so let's take a look at the code. First, we will try to write a simple FastStream -based implementation, and then create a reusable tool based on it.
Raw Implementation Let's imagine we have a simple FastStream echo subscriber like this:
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker ()
app = FastStream ( broker )
@broker . subscriber ( "echo-topic" )
async def echo_handler ( msg : Any ) -> Any :
return msg
It does nothing but publishes responses to all messages with the reply_to
header.
Now, we want to send a message and consume the echo callback. For this reason, we need to create a reply consumer in our producer service. It can look like the following:
from asyncio import Future
from typing import Annotated
from faststream import FastStream , Context
from faststream.kafka import KafkaBroker , KafkaMessage
broker = KafkaBroker ()
app = FastStream ( broker )
@broker . subscriber ( "responses" )
async def response_handler (
msg : KafkaMessage ,
responses : Annotated [
dict [ str , Future [ bytes ]],
Context ( "responses" , initial = dict ),
],
) -> None :
if ( future := responses . pop ( msg . correlation_id , None )):
future . set_result ( msg . body )
This handler simply maps incoming messages to their requests using the correlation_id
field.
Next, we just need to publish a message with the reply_to = "responses"
header, create a Future object, and wait for it.
@app . after_startup
async def send_request (
responses : Annotated [
dict [ str , Future [ bytes ]],
Context ( "responses" , initial = dict ),
],
) -> None :
correlation_id = str ( uuid4 ())
future = responses [ correlation_id ] = Future [ bytes ]()
await broker . publish (
"echo" , "echo-topic" ,
reply_to = "responses" ,
correlation_id = correlation_id ,
)
data : bytes = await future
assert data == b "echo" # returned from echo
Note
message.correlation_id
and message.reply_to
are FastStream -specific message headers, but you can set them using any Kafka client you are using.
Full Example from asyncio import Future , wait_for
from typing import Annotated , Any
from uuid import uuid4
from faststream import FastStream , Context
from faststream.kafka import KafkaBroker , KafkaMessage
broker = KafkaBroker ()
app = FastStream ( broker )
@broker . subscriber ( "echo-topic" )
async def echo_handler ( msg : Any ) -> Any :
return msg
@broker . subscriber ( "responses" )
async def response_handler (
msg : KafkaMessage ,
responses : Annotated [
dict [ str , Future [ bytes ]],
Context ( "responses" , initial = dict ),
],
) -> None :
if ( future := responses . pop ( msg . correlation_id , None )):
future . set_result ( msg . body )
@app . after_startup
async def send_request (
responses : Annotated [
dict [ str , Future [ bytes ]],
Context ( "responses" , initial = dict ),
],
) -> None :
correlation_id = str ( uuid4 ())
future = responses [ correlation_id ] = Future [ bytes ]()
await broker . publish ( "echo" , "echo-topic" , reply_to = "responses" , correlation_id = correlation_id )
try :
data : bytes = await wait_for ( future , timeout = 10.0 )
except TimeoutError :
responses . pop ( correlation_id , None )
raise
assert data == b "echo"
Reusable Class Now that we have a working Kafka RPC implementation, we can encapsulate it into a reusable class that can be copy-pasted between services.
from uuid import uuid4
from asyncio import Future , wait_for
from faststream.types import SendableMessage
from faststream.kafka import KafkaMessage
class RPCWorker :
def __init__ ( self , broker : KafkaBroker , reply_topic : str ) -> None :
self . responses : dict [ str , Future [ bytes ]] = {}
self . broker = broker
self . reply_topic = reply_topic
self . subscriber = broker . subscriber ( reply_topic )
self . subscriber ( self . _handle_responses )
def _handle_responses ( self , msg : KafkaMessage ) -> None :
"""Our replies subscriber."""
if ( future := self . responses . pop ( msg . correlation_id , None )):
future . set_result ( msg . body )
async def request (
self ,
data : SendableMessage ,
topic : str ,
timeout : float = 10.0 ,
) -> bytes :
correlation_id = str ( uuid4 ())
future = self . responses [ correlation_id ] = Future [ bytes ]()
await broker . publish (
data , topic ,
reply_to = self . reply_topic ,
correlation_id = correlation_id ,
)
try :
response : bytes = await wait_for ( future , timeout = timeout )
except TimeoutError :
self . responses . pop ( correlation_id , None )
raise
else :
return response
Now it can be used in the following way:
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker ()
worker = RPCWorker ( broker , reply_topic = "responses" )
app = FastStream ( broker )
@app . after_startup
async def send_request () -> None :
data = await worker . request ( "echo" , "echo-topic" )
assert data == "echo"
Or, if you want to make the RPCWorker
work after startup, you should add a manual start
method to it:
class RPCWorker :
async def start ( self ) -> None :
self . broker . setup_subscriber ( self . subscriber )
await self . subscriber . start ()
Now it can be used after the application has started:
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker ()
app = FastStream ( broker )
@app . after_startup
async def send_request () -> None :
worker = RPCWorker ( broker , reply_topic = "responses" )
await worker . start ()
data = await worker . request ( "echo" , "echo-topic" )
assert data == "echo"
Full Class Example from uuid import uuid4
from asyncio import Future , wait_for
from faststream.types import SendableMessage
from faststream.kafka import KafkaMessage
class RPCWorker :
responses : dict [ str , Future [ bytes ]]
def __init__ ( self , broker : KafkaBroker , reply_topic : str ) -> None :
self . responses = {}
self . broker = broker
self . reply_topic = reply_topic
self . subscriber = broker . subscriber ( reply_topic )
self . subscriber ( self . _handle_responses )
async def start ( self ) -> None :
self . broker . setup_subscriber ( self . subscriber )
await self . subscriber . start ()
async def stop ( self ) -> None :
await self . subscriber . close ()
def _handle_responses ( self , msg : KafkaMessage ) -> None :
if ( future := self . responses . pop ( msg . correlation_id , None )):
future . set_result ( msg . body )
async def request (
self ,
data : SendableMessage ,
topic : str ,
timeout : float = 10.0 ,
) -> bytes :
correlation_id = str ( uuid4 ())
future = self . responses [ correlation_id ] = Future [ bytes ]()
await broker . publish (
data , topic ,
reply_to = self . reply_topic ,
correlation_id = correlation_id ,
)
try :
response : bytes = await wait_for ( future , timeout = timeout )
except TimeoutError :
self . responses . pop ( correlation_id , None )
raise
else :
return response