To start consuming from a Kafka topic, simply decorate your consuming function with a
@broker.subscriber(...) decorator, passing a string as a topic key.
In the folowing example, we will create a simple FastStream app that will consume
HelloWorld messages from a
The full app code looks like this:
Import FastStream and KafkaBroker#
To use the
@broker.subscriber(...) decorator, first, we need to import the base FastStream app KafkaBroker to create our broker.
Define the HelloWorld Message Structure#
Next, you need to define the structure of the messages you want to consume from the topic using Pydantic. For the guide, we’ll stick to something basic, but you are free to define any complex message structure you wish in your project.
Create a KafkaBroker#
Next, we will create a
KafkaBroker object and wrap it into the
FastStream object so that we can start our app using CLI later.
Create a Function that will Consume Messages from a Kafka hello-world Topic#
Let’s create a consumer function that will consume
HelloWorld messages from
"hello_world" topic and log them.
The function decorated with the
@broker.subscriber(...) decorator will be called when a message is produced to Kafka.
The message will then be injected into the typed
msg argument of the function, and its type will be used to parse the message.
In this example case, when the message is sent to a
"hello_world" topic, it will be parsed into a
HelloWorld class, and the
on_hello_world function will be called with the parsed class as the
msg argument value.