Skip to content

PrometheusMiddleware

faststream.prometheus.middleware.PrometheusMiddleware #

PrometheusMiddleware(msg=None, *, settings_provider_factory, metrics_manager)

Bases: BaseMiddleware

Source code in faststream/prometheus/middleware.py
def __init__(
    self,
    msg: Optional[Any] = None,
    *,
    settings_provider_factory: Callable[
        [Any], Optional[MetricsSettingsProvider[Any]]
    ],
    metrics_manager: MetricsManager,
) -> None:
    self._metrics_manager = metrics_manager
    self._settings_provider = settings_provider_factory(msg)
    super().__init__(msg)

msg instance-attribute #

msg = msg

on_receive async #

on_receive()

Hook to call on message receive.

Source code in faststream/broker/middlewares/base.py
async def on_receive(self) -> None:
    """Hook to call on message receive."""
    pass

after_processed async #

after_processed(exc_type=None, exc_val=None, exc_tb=None)

Asynchronously called after processing.

Source code in faststream/broker/middlewares/base.py
async def after_processed(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> Optional[bool]:
    """Asynchronously called after processing."""
    return False

on_consume async #

on_consume(msg)

Asynchronously consumes a message.

Source code in faststream/broker/middlewares/base.py
async def on_consume(
    self,
    msg: "StreamMessage[Any]",
) -> "StreamMessage[Any]":
    """Asynchronously consumes a message."""
    return msg

after_consume async #

after_consume(err)

A function to handle the result of consuming a resource asynchronously.

Source code in faststream/broker/middlewares/base.py
async def after_consume(self, err: Optional[Exception]) -> None:
    """A function to handle the result of consuming a resource asynchronously."""
    if err is not None:
        raise err

on_publish async #

on_publish(msg, *args, **kwargs)

Asynchronously handle a publish event.

Source code in faststream/broker/middlewares/base.py
async def on_publish(
    self,
    msg: Any,
    *args: Any,
    **kwargs: Any,
) -> Any:
    """Asynchronously handle a publish event."""
    return msg

after_publish async #

after_publish(err)

Asynchronous function to handle the after publish event.

Source code in faststream/broker/middlewares/base.py
async def after_publish(
    self,
    err: Optional[Exception],
) -> None:
    """Asynchronous function to handle the after publish event."""
    if err is not None:
        raise err

consume_scope async #

consume_scope(call_next, msg)
Source code in faststream/prometheus/middleware.py
async def consume_scope(
    self,
    call_next: "AsyncFuncAny",
    msg: "StreamMessage[Any]",
) -> Any:
    if self._settings_provider is None:
        return await call_next(msg)

    messaging_system = self._settings_provider.messaging_system
    consume_attrs = self._settings_provider.get_consume_attrs_from_message(msg)
    destination_name = consume_attrs["destination_name"]

    self._metrics_manager.add_received_message(
        amount=consume_attrs["messages_count"],
        broker=messaging_system,
        handler=destination_name,
    )

    self._metrics_manager.observe_received_messages_size(
        size=consume_attrs["message_size"],
        broker=messaging_system,
        handler=destination_name,
    )

    self._metrics_manager.add_received_message_in_process(
        amount=consume_attrs["messages_count"],
        broker=messaging_system,
        handler=destination_name,
    )

    err: Optional[Exception] = None
    start_time = time.perf_counter()

    try:
        result = await call_next(await self.on_consume(msg))

    except Exception as e:
        err = e

        if not isinstance(err, IgnoredException):
            self._metrics_manager.add_received_processed_message_exception(
                exception_type=type(err).__name__,
                broker=messaging_system,
                handler=destination_name,
            )
        raise

    finally:
        duration = time.perf_counter() - start_time
        self._metrics_manager.observe_received_processed_message_duration(
            duration=duration,
            broker=messaging_system,
            handler=destination_name,
        )

        self._metrics_manager.remove_received_message_in_process(
            amount=consume_attrs["messages_count"],
            broker=messaging_system,
            handler=destination_name,
        )

        status = ProcessingStatus.acked

        if msg.committed or err:
            status = (
                PROCESSING_STATUS_BY_ACK_STATUS.get(msg.committed)  # type: ignore[arg-type]
                or PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(type(err))
                or ProcessingStatus.error
            )

        self._metrics_manager.add_received_processed_message(
            amount=consume_attrs["messages_count"],
            status=status,
            broker=messaging_system,
            handler=destination_name,
        )

    return result

publish_scope async #

publish_scope(call_next, msg, *args, **kwargs)
Source code in faststream/prometheus/middleware.py
async def publish_scope(
    self,
    call_next: "AsyncFunc",
    msg: Any,
    *args: Any,
    **kwargs: Any,
) -> Any:
    if self._settings_provider is None:
        return await call_next(msg, *args, **kwargs)

    destination_name = (
        self._settings_provider.get_publish_destination_name_from_kwargs(kwargs)
    )
    messaging_system = self._settings_provider.messaging_system

    err: Optional[Exception] = None
    start_time = time.perf_counter()

    try:
        result = await call_next(
            await self.on_publish(msg, *args, **kwargs),
            *args,
            **kwargs,
        )

    except Exception as e:
        err = e
        self._metrics_manager.add_published_message_exception(
            exception_type=type(err).__name__,
            broker=messaging_system,
            destination=destination_name,
        )
        raise

    finally:
        duration = time.perf_counter() - start_time

        self._metrics_manager.observe_published_message_duration(
            duration=duration,
            broker=messaging_system,
            destination=destination_name,
        )

        status = PublishingStatus.error if err else PublishingStatus.success
        messages_count = len((msg, *args))

        self._metrics_manager.add_published_message(
            amount=messages_count,
            status=status,
            broker=messaging_system,
            destination=destination_name,
        )

    return result