Skip to content

create_topics

faststream.confluent.client.create_topics #

create_topics(topics, config, logger_=None)
Source code in faststream/confluent/client.py
def create_topics(
    topics: List[str],
    config: Dict[str, Optional[Union[str, int, float, bool, Any]]],
    logger_: Optional["LoggerProto"] = None,
) -> None:
    logger_ = logger_ or faststream_logger

    """Creates Kafka topics using the provided configuration."""
    admin_client = AdminClient(
        {x: config[x] for x in ADMINCLIENT_CONFIG_PARAMS if x in config}
    )

    fs = admin_client.create_topics(
        [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
    )

    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
        except Exception as e:  # noqa: PERF203
            if "TOPIC_ALREADY_EXISTS" not in str(e):
                logger_.log(logging.WARN, f"Failed to create topic {topic}: {e}")
        else:
            logger_.log(logging.INFO, f"Topic `{topic}` created.")