Skip to content

TelemetryMiddleware

faststream.opentelemetry.TelemetryMiddleware #

TelemetryMiddleware(*, settings_provider_factory, tracer_provider=None, meter_provider=None, meter=None, include_messages_counters=False)
Source code in faststream/opentelemetry/middleware.py
def __init__(
    self,
    *,
    settings_provider_factory: Callable[
        [Any], Optional[TelemetrySettingsProvider[Any]]
    ],
    tracer_provider: Optional["TracerProvider"] = None,
    meter_provider: Optional["MeterProvider"] = None,
    meter: Optional["Meter"] = None,
    include_messages_counters: bool = False,
) -> None:
    self._tracer = _get_tracer(tracer_provider)
    self._meter = _get_meter(meter_provider, meter)
    self._metrics = _MetricsContainer(self._meter, include_messages_counters)
    self._settings_provider_factory = settings_provider_factory