Skip to content

Passing Additional Configuration to confluent-kafka-python#

The confluent-kafka-python package is a Python wrapper around librdkakfa, which is a C/C++ client library for Apache Kafka.

confluent-kafka-python accepts a config dictionary that is then passed on to librdkafka. librdkafka provides plenty of configuration properties to configure the Kafka client.

FastStream also provides users with the ability to pass the config dictionary to librdkafka to provide greater customizability.

Example#

In the following example, we are setting the parameter topic.metadata.refresh.fast.interval.ms's value to 300 instead of the default value 100 via the config parameter.

from pydantic import BaseModel, Field

from faststream import FastStream, Logger
from faststream.confluent import KafkaBroker


class HelloWorld(BaseModel):
    msg: str = Field(
        ...,
        examples=["Hello"],
        description="Demo hello world message",
    )


config = {"topic.metadata.refresh.fast.interval.ms": 300}
broker = KafkaBroker("localhost:9092", config=config)
app = FastStream(broker)


@broker.subscriber("hello_world")
async def on_hello_world(msg: HelloWorld, logger: Logger):
    logger.info(msg)

Similarly, you could use the config parameter to pass any configuration properties to librdkafka.