Skip to content

CriticalLogMiddleware

faststream.broker.middlewares.logging.CriticalLogMiddleware #

CriticalLogMiddleware(logger, log_level)

Bases: BaseMiddleware

A middleware class for logging critical errors.

Initialize the class.

Source code in faststream/broker/middlewares/logging.py
def __init__(
    self,
    logger: Optional["LoggerProto"],
    log_level: int,
) -> None:
    """Initialize the class."""
    self.logger = logger
    self.log_level = log_level

msg instance-attribute #

msg = msg

logger instance-attribute #

logger = logger

log_level instance-attribute #

log_level = log_level

on_receive async #

on_receive()

Hook to call on message receive.

Source code in faststream/broker/middlewares/base.py
async def on_receive(self) -> None:
    """Hook to call on message receive."""
    pass

after_consume async #

after_consume(err)

A function to handle the result of consuming a resource asynchronously.

Source code in faststream/broker/middlewares/base.py
async def after_consume(self, err: Optional[Exception]) -> None:
    """A function to handle the result of consuming a resource asynchronously."""
    if err is not None:
        raise err

consume_scope async #

consume_scope(call_next, msg)

Asynchronously consumes a message and returns an asynchronous iterator of decoded messages.

Source code in faststream/broker/middlewares/base.py
async def consume_scope(
    self,
    call_next: "AsyncFuncAny",
    msg: "StreamMessage[Any]",
) -> Any:
    """Asynchronously consumes a message and returns an asynchronous iterator of decoded messages."""
    err: Optional[Exception] = None
    try:
        result = await call_next(await self.on_consume(msg))

    except Exception as e:
        err = e

    else:
        return result

    finally:
        await self.after_consume(err)

on_publish async #

on_publish(msg, *args, **kwargs)

Asynchronously handle a publish event.

Source code in faststream/broker/middlewares/base.py
async def on_publish(
    self,
    msg: Any,
    *args: Any,
    **kwargs: Any,
) -> Any:
    """Asynchronously handle a publish event."""
    return msg

after_publish async #

after_publish(err)

Asynchronous function to handle the after publish event.

Source code in faststream/broker/middlewares/base.py
async def after_publish(
    self,
    err: Optional[Exception],
) -> None:
    """Asynchronous function to handle the after publish event."""
    if err is not None:
        raise err

publish_scope async #

publish_scope(call_next, msg, *args, **kwargs)

Publish a message and return an async iterator.

Source code in faststream/broker/middlewares/base.py
async def publish_scope(
    self,
    call_next: "AsyncFunc",
    msg: Any,
    *args: Any,
    **kwargs: Any,
) -> Any:
    """Publish a message and return an async iterator."""
    err: Optional[Exception] = None
    try:
        result = await call_next(
            await self.on_publish(msg, *args, **kwargs),
            *args,
            **kwargs,
        )

    except Exception as e:
        err = e

    else:
        return result

    finally:
        await self.after_publish(err)

on_consume async #

on_consume(msg)
Source code in faststream/broker/middlewares/logging.py
async def on_consume(
    self,
    msg: "StreamMessage[Any]",
) -> "StreamMessage[Any]":
    if self.logger is not None:
        c = context.get_local("log_context", {})
        self.logger.log(self.log_level, "Received", extra=c)

    return await super().on_consume(msg)

after_processed async #

after_processed(exc_type=None, exc_val=None, exc_tb=None)

Asynchronously called after processing.

Source code in faststream/broker/middlewares/logging.py
async def after_processed(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> bool:
    """Asynchronously called after processing."""
    if self.logger is not None:
        c = context.get_local("log_context", {})

        if exc_type:
            if issubclass(exc_type, IgnoredException):
                self.logger.log(
                    logging.INFO,
                    exc_val,
                    extra=c,
                )
            else:
                self.logger.log(
                    logging.ERROR,
                    f"{exc_type.__name__}: {exc_val}",
                    exc_info=exc_val,
                    extra=c,
                )

        self.logger.log(self.log_level, "Processed", extra=c)

    await super().after_processed(exc_type, exc_val, exc_tb)

    # Exception was not processed
    return False