Skip to content

settings_provider_factory

faststream.redis.prometheus.provider.settings_provider_factory #

settings_provider_factory(msg)
Source code in faststream/redis/prometheus/provider.py
def settings_provider_factory(
    msg: Optional["AnyDict"],
) -> Union[
    RedisMetricsSettingsProvider,
    BatchRedisMetricsSettingsProvider,
]:
    if msg is not None and msg.get("type", "").startswith("b"):
        return BatchRedisMetricsSettingsProvider()
    else:
        return RedisMetricsSettingsProvider()