Skip to content

Basic Subscriber#

To start consuming from a Kafka topic, simply decorate your consuming function with a @broker.subscriber(...) decorator, passing a string as a topic key.

In the following example, we will create a simple FastStream app that will consume HelloWorld messages from a "hello_world" topic.

The full app code looks like this:

from pydantic import BaseModel, Field

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class HelloWorld(BaseModel):
    msg: str = Field(
        ...,
        examples=["Hello"],
        description="Demo hello world message",
    )


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber("hello_world")
async def on_hello_world(msg: HelloWorld, logger: Logger):
    logger.info(msg)

Import FastStream and KafkaBroker#

To use the @broker.subscriber(...) decorator, first, we need to import the base FastStream app KafkaBroker to create our broker.

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker

Define the HelloWorld Message Structure#

Next, you need to define the structure of the messages you want to consume from the topic using Pydantic. For the guide, we’ll stick to something basic, but you are free to define any complex message structure you wish in your project.

1
2
3
4
5
6
class HelloWorld(BaseModel):
    msg: str = Field(
        ...,
        examples=["Hello"],
        description="Demo hello world message",
    )

Create a KafkaBroker#

Next, we will create a KafkaBroker object and wrap it into the FastStream object so that we can start our app using CLI later.

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

Create a Function that will Consume Messages from a Kafka hello-world Topic#

Let’s create a consumer function that will consume HelloWorld messages from "hello_world" topic and log them.

1
2
3
@broker.subscriber("hello_world")
async def on_hello_world(msg: HelloWorld, logger: Logger):
    logger.info(msg)

The function decorated with the @broker.subscriber(...) decorator will be called when a message is produced to Kafka.

The message will then be injected into the typed msg argument of the function, and its type will be used to parse the message.

In this example case, when the message is sent to a "hello_world" topic, it will be parsed into a HelloWorld class, and the on_hello_world function will be called with the parsed class as the msg argument value.

Pattern data access#

You can also use pattern subscription feature to encode some data directly in the topic name. With FastStream you can easily access this data using the following code:

from faststream import Path

@broker.subscriber(pattern="logs.{level}")
async def base_handler(
    body: str,
    level: str = Path(),
):
    ...