def create_topics(
topics: List[str], config: Dict[str, Optional[Union[str, int, float, bool, Any]]]
) -> None:
"""Creates Kafka topics using the provided configuration.
Args:
topics (List[str]): A list of topic names to create.
config (Dict[str, Optional[Union[str, int, float, bool, Any]]]): A dictionary of configuration options for the AdminClient.
"""
needed_config_params = [
"allow.auto.create.topics",
"bootstrap.servers",
"client.id",
"request.timeout.ms",
"metadata.max.age.ms",
"security.protocol",
"connections.max.idle.ms",
"sasl.mechanism",
"sasl.username",
"sasl.password",
"sasl.kerberos.service.name",
]
admin_client_config = {x: config[x] for x in needed_config_params if x in config}
admin_client = AdminClient(admin_client_config)
fs = admin_client.create_topics(
[NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
)
for topic, f in fs.items():
try:
f.result() # The result itself is None
logger.info(f"Topic {topic} created at create_topics")
except Exception as e: # noqa: PERF203
logger.warning(f"Failed to create topic {topic}: {e}")