def create_topics(
topics: List[str],
config: Dict[str, Optional[Union[str, int, float, bool, Any]]],
logger_: Optional["LoggerProto"] = None,
) -> None:
logger_ = logger_ or faststream_logger
"""Creates Kafka topics using the provided configuration."""
admin_client = AdminClient(
{x: config[x] for x in ADMINCLIENT_CONFIG_PARAMS if x in config}
)
fs = admin_client.create_topics(
[NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
)
for topic, f in fs.items():
try:
f.result() # The result itself is None
except Exception as e: # noqa: PERF203
if "TOPIC_ALREADY_EXISTS" not in str(e):
logger_.log(logging.WARN, f"Failed to create topic {topic}: {e}")
else:
logger_.log(logging.INFO, f"Topic `{topic}` created.")