def __init__(
self,
*,
settings_provider_factory: Callable[
[Any], Optional[MetricsSettingsProvider[Any]]
],
registry: "CollectorRegistry",
app_name: str = EMPTY,
metrics_prefix: str = "faststream",
received_messages_size_buckets: Optional[Sequence[float]] = None,
):
if app_name is EMPTY:
app_name = metrics_prefix
self._settings_provider_factory = settings_provider_factory
self._metrics_container = MetricsContainer(
registry,
metrics_prefix=metrics_prefix,
received_messages_size_buckets=received_messages_size_buckets,
)
self._metrics_manager = MetricsManager(
self._metrics_container,
app_name=app_name,
)