Skip to content

BasePrometheusMiddleware

faststream.prometheus.BasePrometheusMiddleware #

BasePrometheusMiddleware(*, settings_provider_factory, registry, app_name=EMPTY, metrics_prefix='faststream', received_messages_size_buckets=None)
Source code in faststream/prometheus/middleware.py
def __init__(
    self,
    *,
    settings_provider_factory: Callable[
        [Any], Optional[MetricsSettingsProvider[Any]]
    ],
    registry: "CollectorRegistry",
    app_name: str = EMPTY,
    metrics_prefix: str = "faststream",
    received_messages_size_buckets: Optional[Sequence[float]] = None,
):
    if app_name is EMPTY:
        app_name = metrics_prefix

    self._settings_provider_factory = settings_provider_factory
    self._metrics_container = MetricsContainer(
        registry,
        metrics_prefix=metrics_prefix,
        received_messages_size_buckets=received_messages_size_buckets,
    )
    self._metrics_manager = MetricsManager(
        self._metrics_container,
        app_name=app_name,
    )