Skip to content

create_topics

faststream.confluent.client.create_topics #

create_topics(topics: List[str], config: Dict[str, Optional[Union[str, int, float, bool, Any]]]) -> None

Creates Kafka topics using the provided configuration.

PARAMETER DESCRIPTION
topics

A list of topic names to create.

TYPE: List[str]

config

A dictionary of configuration options for the AdminClient.

TYPE: Dict[str, Optional[Union[str, int, float, bool, Any]]]

Source code in faststream/confluent/client.py
def create_topics(
    topics: List[str], config: Dict[str, Optional[Union[str, int, float, bool, Any]]]
) -> None:
    """Creates Kafka topics using the provided configuration.

    Args:
        topics (List[str]): A list of topic names to create.
        config (Dict[str, Optional[Union[str, int, float, bool, Any]]]): A dictionary of configuration options for the AdminClient.
    """
    needed_config_params = [
        "allow.auto.create.topics",
        "bootstrap.servers",
        "client.id",
        "request.timeout.ms",
        "metadata.max.age.ms",
        "security.protocol",
        "connections.max.idle.ms",
        "sasl.mechanism",
        "sasl.username",
        "sasl.password",
        "sasl.kerberos.service.name",
    ]

    admin_client_config = {x: config[x] for x in needed_config_params if x in config}
    admin_client = AdminClient(admin_client_config)
    fs = admin_client.create_topics(
        [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
    )

    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            logger.info(f"Topic {topic} created at create_topics")
        except Exception as e:  # noqa: PERF203
            logger.warning(f"Failed to create topic {topic}: {e}")