Skip to content

BaseMiddleware

faststream.BaseMiddleware #

BaseMiddleware(msg: Any)

A base middleware class.

METHOD DESCRIPTION
on_receive

Called when a message is received.

after_processed

Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) -> Optional[bool]: Called after processing a message.

__aenter__

Called when entering a context.

__aexit__

Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) -> Optional[bool]: Called when exiting a context.

on_consume

DecodedMessage) -> DecodedMessage: Called before consuming a message.

after_consume

Optional[Exception]) -> None: Called after consuming a message.

consume_scope

DecodedMessage) -> AsyncIterator[DecodedMessage]: Context manager for consuming a message.

on_publish

SendableMessage) -> SendableMessage: Called before publishing a message.

after_publish

Optional[Exception]) -> None: Asynchronous function to handle the after publish event.

Initialize the class.

PARAMETER DESCRIPTION
msg

Any message to be stored.

TYPE: Any

Source code in faststream/broker/middlewares.py
def __init__(self, msg: Any) -> None:
    """Initialize the class.

    Args:
        msg: Any message to be stored.
    """
    self.msg = msg

msg instance-attribute #

msg = msg

after_consume async #

after_consume(err: Optional[Exception]) -> None

A function to handle the result of consuming a resource asynchronously.

PARAMETER DESCRIPTION
err

Optional exception that occurred during consumption

RAISES DESCRIPTION
err

If an exception occurred during consumption

Source code in faststream/broker/middlewares.py
async def after_consume(self, err: Optional[Exception]) -> None:
    """A function to handle the result of consuming a resource asynchronously.

    Args:
        err : Optional exception that occurred during consumption

    Raises:
        err : If an exception occurred during consumption
    """
    if err is not None:
        raise err

after_processed async #

after_processed(
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> Optional[bool]

Asynchronously called after processing.

PARAMETER DESCRIPTION
exc_type

Optional exception type

TYPE: Optional[Type[BaseException]] DEFAULT: None

exc_val

Optional exception value

TYPE: Optional[BaseException] DEFAULT: None

exec_tb

Optional traceback

TYPE: Optional[TracebackType] DEFAULT: None

RETURNS DESCRIPTION
Optional[bool]

Optional boolean value indicating whether the processing was successful or not.

Source code in faststream/broker/middlewares.py
async def after_processed(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> Optional[bool]:
    """Asynchronously called after processing.

    Args:
        exc_type: Optional exception type
        exc_val: Optional exception value
        exec_tb: Optional traceback

    Returns:
        Optional boolean value indicating whether the processing was successful or not.
    """
    return False

after_publish async #

after_publish(err: Optional[Exception]) -> None

Asynchronous function to handle the after publish event.

PARAMETER DESCRIPTION
err

Optional exception that occurred during the publish

TYPE: Optional[Exception]

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
Exception

If an error occurred during the publish

Source code in faststream/broker/middlewares.py
async def after_publish(self, err: Optional[Exception]) -> None:
    """Asynchronous function to handle the after publish event.

    Args:
        err: Optional exception that occurred during the publish

    Returns:
        None

    Raises:
        Exception: If an error occurred during the publish
    """
    if err is not None:
        raise err

consume_scope async #

consume_scope(
    msg: DecodedMessage,
) -> AsyncIterator[DecodedMessage]

Asynchronously consumes a message and returns an asynchronous iterator of decoded messages.

PARAMETER DESCRIPTION
msg

The decoded message to consume.

TYPE: DecodedMessage

YIELDS DESCRIPTION
AsyncIterator[DecodedMessage]

An asynchronous iterator of decoded messages.

RETURNS DESCRIPTION
AsyncIterator[DecodedMessage]

An asynchronous iterator of decoded messages.

RAISES DESCRIPTION
Exception

If an error occurs while consuming the message.

AsyncIterator

An asynchronous iterator that yields decoded messages.

Note

This function is an async function.

Source code in faststream/broker/middlewares.py
@asynccontextmanager
async def consume_scope(self, msg: DecodedMessage) -> AsyncIterator[DecodedMessage]:
    """Asynchronously consumes a message and returns an asynchronous iterator of decoded messages.

    Args:
        msg: The decoded message to consume.

    Yields:
        An asynchronous iterator of decoded messages.

    Returns:
        An asynchronous iterator of decoded messages.

    Raises:
        Exception: If an error occurs while consuming the message.

    AsyncIterator:
        An asynchronous iterator that yields decoded messages.

    Note:
        This function is an async function.
    """
    err: Optional[Exception]
    try:
        yield await self.on_consume(msg)
    except Exception as e:
        err = e
    else:
        err = None
    await self.after_consume(err)

on_consume async #

on_consume(msg: DecodedMessage) -> DecodedMessage

Asynchronously consumes a message.

PARAMETER DESCRIPTION
msg

The message to be consumed.

TYPE: DecodedMessage

RETURNS DESCRIPTION
DecodedMessage

The consumed message.

Source code in faststream/broker/middlewares.py
async def on_consume(self, msg: DecodedMessage) -> DecodedMessage:
    """Asynchronously consumes a message.

    Args:
        msg: The message to be consumed.

    Returns:
        The consumed message.
    """
    return msg

on_publish async #

on_publish(msg: SendableMessage) -> SendableMessage

Asynchronously handle a publish event.

PARAMETER DESCRIPTION
msg

The message to be published.

TYPE: SendableMessage

RETURNS DESCRIPTION
SendableMessage

The published message.

Source code in faststream/broker/middlewares.py
async def on_publish(self, msg: SendableMessage) -> SendableMessage:
    """Asynchronously handle a publish event.

    Args:
        msg: The message to be published.

    Returns:
        The published message.
    """
    return msg

on_receive async #

on_receive() -> None
Source code in faststream/broker/middlewares.py
async def on_receive(self) -> None:
    pass

publish_scope async #

publish_scope(
    msg: SendableMessage,
) -> AsyncIterator[SendableMessage]

Publish a message and return an async iterator.

PARAMETER DESCRIPTION
msg

The message to be published.

TYPE: SendableMessage

YIELDS DESCRIPTION
AsyncIterator[SendableMessage]

A sendable message.

RETURNS DESCRIPTION
AsyncIterator[SendableMessage]

An async iterator of sendable messages.

RAISES DESCRIPTION
Exception

If an error occurs during publishing.

Source code in faststream/broker/middlewares.py
@asynccontextmanager
async def publish_scope(
    self, msg: SendableMessage
) -> AsyncIterator[SendableMessage]:
    """Publish a message and return an async iterator.

    Args:
        msg: The message to be published.

    Yields:
        A sendable message.

    Returns:
        An async iterator of sendable messages.

    Raises:
        Exception: If an error occurs during publishing.

    """
    err: Optional[Exception]
    try:
        yield await self.on_publish(msg)
    except Exception as e:
        err = e
    else:
        err = None
    await self.after_publish(err)