Skip to content

settings_provider_factory

faststream.confluent.prometheus.provider.settings_provider_factory #

settings_provider_factory(msg)
Source code in faststream/confluent/prometheus/provider.py
def settings_provider_factory(
    msg: Union["Message", Sequence["Message"], None],
) -> Union[
    ConfluentMetricsSettingsProvider,
    BatchConfluentMetricsSettingsProvider,
]:
    if isinstance(msg, Sequence):
        return BatchConfluentMetricsSettingsProvider()
    else:
        return ConfluentMetricsSettingsProvider()