Skip to content

settings_provider_factory

faststream.nats.prometheus.provider.settings_provider_factory #

settings_provider_factory(msg)
Source code in faststream/nats/prometheus/provider.py
def settings_provider_factory(
    msg: Union["Msg", Sequence["Msg"], None],
) -> Union[
    NatsMetricsSettingsProvider,
    BatchNatsMetricsSettingsProvider,
    None,
]:
    if isinstance(msg, Sequence):
        return BatchNatsMetricsSettingsProvider()
    elif isinstance(msg, Msg) or msg is None:
        return NatsMetricsSettingsProvider()
    else:
        # KeyValue and Object Storage watch cases
        return None