Skip to content

MetricsManager

faststream.prometheus.manager.MetricsManager #

MetricsManager(container, *, app_name='faststream')
Source code in faststream/prometheus/manager.py
def __init__(self, container: MetricsContainer, *, app_name: str = "faststream"):
    self._container = container
    self._app_name = app_name

add_received_message #

add_received_message(broker, handler, amount=1)
Source code in faststream/prometheus/manager.py
def add_received_message(self, broker: str, handler: str, amount: int = 1) -> None:
    self._container.received_messages_total.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
    ).inc(amount)

observe_received_messages_size #

observe_received_messages_size(broker, handler, size)
Source code in faststream/prometheus/manager.py
def observe_received_messages_size(
    self,
    broker: str,
    handler: str,
    size: int,
) -> None:
    self._container.received_messages_size_bytes.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
    ).observe(size)

add_received_message_in_process #

add_received_message_in_process(broker, handler, amount=1)
Source code in faststream/prometheus/manager.py
def add_received_message_in_process(
    self,
    broker: str,
    handler: str,
    amount: int = 1,
) -> None:
    self._container.received_messages_in_process.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
    ).inc(amount)

remove_received_message_in_process #

remove_received_message_in_process(broker, handler, amount=1)
Source code in faststream/prometheus/manager.py
def remove_received_message_in_process(
    self,
    broker: str,
    handler: str,
    amount: int = 1,
) -> None:
    self._container.received_messages_in_process.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
    ).dec(amount)

add_received_processed_message #

add_received_processed_message(broker, handler, status, amount=1)
Source code in faststream/prometheus/manager.py
def add_received_processed_message(
    self,
    broker: str,
    handler: str,
    status: ProcessingStatus,
    amount: int = 1,
) -> None:
    self._container.received_processed_messages_total.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
        status=status.value,
    ).inc(amount)

observe_received_processed_message_duration #

observe_received_processed_message_duration(duration, broker, handler)
Source code in faststream/prometheus/manager.py
def observe_received_processed_message_duration(
    self,
    duration: float,
    broker: str,
    handler: str,
) -> None:
    self._container.received_processed_messages_duration_seconds.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
    ).observe(duration)

add_received_processed_message_exception #

add_received_processed_message_exception(broker, handler, exception_type)
Source code in faststream/prometheus/manager.py
def add_received_processed_message_exception(
    self,
    broker: str,
    handler: str,
    exception_type: str,
) -> None:
    self._container.received_processed_messages_exceptions_total.labels(
        app_name=self._app_name,
        broker=broker,
        handler=handler,
        exception_type=exception_type,
    ).inc()

add_published_message #

add_published_message(broker, destination, status, amount=1)
Source code in faststream/prometheus/manager.py
def add_published_message(
    self,
    broker: str,
    destination: str,
    status: PublishingStatus,
    amount: int = 1,
) -> None:
    self._container.published_messages_total.labels(
        app_name=self._app_name,
        broker=broker,
        destination=destination,
        status=status.value,
    ).inc(amount)

observe_published_message_duration #

observe_published_message_duration(duration, broker, destination)
Source code in faststream/prometheus/manager.py
def observe_published_message_duration(
    self,
    duration: float,
    broker: str,
    destination: str,
) -> None:
    self._container.published_messages_duration_seconds.labels(
        app_name=self._app_name,
        broker=broker,
        destination=destination,
    ).observe(duration)

add_published_message_exception #

add_published_message_exception(broker, destination, exception_type)
Source code in faststream/prometheus/manager.py
def add_published_message_exception(
    self,
    broker: str,
    destination: str,
    exception_type: str,
) -> None:
    self._container.published_messages_exceptions_total.labels(
        app_name=self._app_name,
        broker=broker,
        destination=destination,
        exception_type=exception_type,
    ).inc()