Basic Subscriber#
To start consuming from a Kafka topic, simply decorate your consuming function with a @broker.subscriber(...)
decorator, passing a string as a topic key.
In the following example, we will create a simple FastStream app that will consume HelloWorld
messages from a "hello_world"
topic.
The full app code looks like this:
Import FastStream and KafkaBroker#
To use the @broker.subscriber(...)
decorator, first, we need to import the base FastStream app KafkaBroker to create our broker.
Define the HelloWorld Message Structure#
Next, you need to define the structure of the messages you want to consume from the topic using Pydantic. For the guide, we’ll stick to something basic, but you are free to define any complex message structure you wish in your project.
Create a KafkaBroker#
Next, we will create a KafkaBroker
object and wrap it into the FastStream
object so that we can start our app using CLI later.
Create a Function that will Consume Messages from a Kafka hello-world Topic#
Let’s create a consumer function that will consume HelloWorld
messages from "hello_world"
topic and log them.
The function decorated with the @broker.subscriber(...)
decorator will be called when a message is produced to Kafka.
The message will then be injected into the typed msg
argument of the function, and its type will be used to parse the message.
In this example case, when the message is sent to a "hello_world"
topic, it will be parsed into a HelloWorld
class, and the on_hello_world
function will be called with the parsed class as the msg
argument value.
Pattern data access#
You can also use pattern subscription feature to encode some data directly in the topic name. With FastStream you can easily access this data using the following code: