Skip to content

Prometheus#

Prometheus is an open-source monitoring and alerting toolkit originally built at SoundCloud. With a focus on reliability, robustness, and easy scalability, Prometheus allows users to collect metrics, scrape data from various sources, store them efficiently, and query them in real-time. Its flexible data model, powerful query language, and seamless integration with Grafana make it a popular choice for monitoring the health and performance of systems and applications.

FastStream Metrics#

To add a metrics to your broker, you need to:

  1. Install FastStream with prometheus-client

    pip install faststream[prometheus]
    
  2. Add PrometheusMiddleware to your broker

from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.prometheus import KafkaPrometheusMiddleware
from prometheus_client import CollectorRegistry

registry = CollectorRegistry()

broker = KafkaBroker(
    middlewares=(
        KafkaPrometheusMiddleware(registry=registry),
    )
)
app = FastStream(broker)
from faststream import FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.prometheus import KafkaPrometheusMiddleware
from prometheus_client import CollectorRegistry

registry = CollectorRegistry()

broker = KafkaBroker(
    middlewares=(
        KafkaPrometheusMiddleware(registry=registry),
    )
)
app = FastStream(broker)
from faststream import FastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.prometheus import RabbitPrometheusMiddleware
from prometheus_client import CollectorRegistry

registry = CollectorRegistry()

broker = RabbitBroker(
    middlewares=(
        RabbitPrometheusMiddleware(registry=registry),
    )
)
app = FastStream(broker)
from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.nats.prometheus import NatsPrometheusMiddleware
from prometheus_client import CollectorRegistry

registry = CollectorRegistry()

broker = NatsBroker(
    middlewares=(
        NatsPrometheusMiddleware(registry=registry),
    )
)
app = FastStream(broker)
from faststream import FastStream
from faststream.redis import RedisBroker
from faststream.redis.prometheus import RedisPrometheusMiddleware
from prometheus_client import CollectorRegistry

registry = CollectorRegistry()

broker = RedisBroker(
    middlewares=(
        RedisPrometheusMiddleware(registry=registry),
    )
)
app = FastStream(broker)

Exposing the /metrics endpoint#

The way Prometheus works requires the service to expose an HTTP endpoint for analysis. By convention, this is a GET endpoint, and its path is usually /metrics.

FastStream's built-in ASGI support allows you to expose endpoints in your application.

A convenient way to serve this endpoint is to use make_asgi_app from prometheus_client, passing in the registry that was passed to PrometheusMiddleware.

from faststream.asgi import AsgiFastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.prometheus import KafkaPrometheusMiddleware
from prometheus_client import CollectorRegistry, make_asgi_app

registry = CollectorRegistry()

broker = KafkaBroker(
    middlewares=(
        KafkaPrometheusMiddleware(registry=registry),
    )
)
app = AsgiFastStream(
    broker,
    asgi_routes=[
        ("/metrics", make_asgi_app(registry)),
    ]
)
from faststream.asgi import AsgiFastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.prometheus import KafkaPrometheusMiddleware
from prometheus_client import CollectorRegistry, make_asgi_app

registry = CollectorRegistry()

broker = KafkaBroker(
    middlewares=(
        KafkaPrometheusMiddleware(registry=registry),
    )
)
app = AsgiFastStream(
    broker,
    asgi_routes=[
        ("/metrics", make_asgi_app(registry)),
    ]
)
from faststream.asgi import AsgiFastStream
from faststream.rabbit import RabbitBroker
from faststream.rabbit.prometheus import RabbitPrometheusMiddleware
from prometheus_client import CollectorRegistry, make_asgi_app

registry = CollectorRegistry()

broker = RabbitBroker(
    middlewares=(
        RabbitPrometheusMiddleware(registry=registry),
    )
)
app = AsgiFastStream(
    broker,
    asgi_routes=[
        ("/metrics", make_asgi_app(registry)),
    ]
)
from faststream.asgi import AsgiFastStream
from faststream.nats import NatsBroker
from faststream.nats.prometheus import NatsPrometheusMiddleware
from prometheus_client import CollectorRegistry, make_asgi_app

registry = CollectorRegistry()

broker = NatsBroker(
    middlewares=(
        NatsPrometheusMiddleware(registry=registry),
    )
)
app = AsgiFastStream(
    broker,
    asgi_routes=[
        ("/metrics", make_asgi_app(registry)),
    ]
)
from faststream.asgi import AsgiFastStream
from faststream.redis import RedisBroker
from faststream.redis.prometheus import RedisPrometheusMiddleware
from prometheus_client import CollectorRegistry, make_asgi_app

registry = CollectorRegistry()

broker = RedisBroker(
    middlewares=(
        RedisPrometheusMiddleware(registry=registry),
    )
)
app = AsgiFastStream(
    broker,
    asgi_routes=[
        ("/metrics", make_asgi_app(registry)),
    ]
)

Exported metrics#

Metric Type Description Labels
received_messages_total Counter The metric is incremented each time the application receives a message.

This is necessary to count messages that the application has received but has not yet started processing.
app_name, broker, handler
received_messages_size_bytes Histogram The metric is filled with the sizes of received messages. When a message is received, the size of its body in bytes is calculated and written to the metric.

Useful for analyzing the sizes of incoming messages, also in cases when the application receives messages of unexpected sizes.
app_name, broker, handler
received_messages_in_process Gauge The metric is incremented when the message processing starts and decremented when the processing ends.

It is necessary to count the number of messages that the application processes.

Such a metric will help answer the question: is there a need to scale the service?
app_name, broker, handler
received_processed_messages_total Counter The metric is incremented after a message is processed, regardless of whether the processing ended with a success or an error.

This metric allows you to analyze the number of processed messages and their statuses.
app_name, broker, handler, status
received_processed_messages_duration_seconds Histogram The metric is filled with the message processing time regardless of whether the processing ended with a success or an error.

Time stamps are recorded just before and immediately after the processing.

Then the metric is filled with their difference (in seconds).
app_name, broker, handler
received_processed_messages_exceptions_total Counter The metric is incremented if any exception occurred while processing a message (except AckMessage, NackMessage, RejectMessage and SkipMessage).

It can be used to draw conclusions about how many and what kind of exceptions occurred while processing messages.
app_name, broker, handler, exception_type
published_messages_total Counter The metric is incremented when messages are sent, regardless of whether the sending was successful or not. app_name, broker, destination, status
published_messages_duration_seconds Histogram The metric is filled with the time the message was sent, regardless of whether the sending was successful or failed.

Timestamps are written immediately before and immediately after sending.

Then the metric is filled with their difference (in seconds).
app_name, broker, destination
published_messages_exceptions_total Counter The metric increases if any exception occurred while sending a message.

You can draw conclusions about how many and what exceptions occurred while sending messages.
app_name, broker, destination, exception_type

Labels#

Label Description Values
app_name The name of the application, which the user can specify himself faststream by default
broker Broker name kafka, rabbit, nats, redis
handler Where the message came from
status (while receiving) Message processing status acked, nacked, rejected, skipped, error
exception_type (while receiving) Exception type when processing message
status (while publishing) Message publishing status success, error
destination Where the message is sent
exception_type (while publishing) Exception type when publishing message

Grafana dashboard#

You can import the Grafana dashboard to visualize the metrics collected by middleware.

Enter the dashboard URL https://grafana.com/grafana/dashboards/22130-faststream-metrics/ (or just the ID, 22130), and click on Load.

HTML-page Import dashboard

An example application with configured metrics, Prometheus and Grafana.

HTML-page Grafana dashboard