Skip to content

BaseWatcher

faststream.broker.push_back_watcher.BaseWatcher #

BaseWatcher(max_tries: int = 0, logger: Optional[Logger] = None)

Bases: ABC

A base class for a watcher.

PARAMETER DESCRIPTION
max_tries

maximum number of tries allowed (default=0)

DEFAULT: 0

logger

logger object (optional)

DEFAULT: None

METHOD DESCRIPTION
add

add a message to the watcher

is_max

check if the maximum number of tries has been reached for a message

remove

remove a message from the watcher

Initialize the class.

PARAMETER DESCRIPTION
max_tries

Maximum number of tries allowed

TYPE: int DEFAULT: 0

logger

Optional logger object

TYPE: Optional[Logger] DEFAULT: None

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented in the subclass.

Source code in faststream/broker/push_back_watcher.py
def __init__(
    self,
    max_tries: int = 0,
    logger: Optional[Logger] = None,
) -> None:
    """Initialize the class.

    Args:
        max_tries: Maximum number of tries allowed
        logger: Optional logger object

    Raises:
        NotImplementedError: If the method is not implemented in the subclass.

    """
    self.logger = logger
    self.max_tries = max_tries

logger instance-attribute #

logger = logger

max_tries instance-attribute #

max_tries: int = max_tries

add abstractmethod #

add(message_id: str) -> None

Add a message.

PARAMETER DESCRIPTION
message_id

ID of the message to be added

TYPE: str

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented

Source code in faststream/broker/push_back_watcher.py
@abstractmethod
def add(self, message_id: str) -> None:
    """Add a message.

    Args:
        message_id: ID of the message to be added

    Returns:
        None

    Raises:
        NotImplementedError: If the method is not implemented

    """
    raise NotImplementedError()

is_max abstractmethod #

is_max(message_id: str) -> bool

Check if the given message ID is the maximum.

PARAMETER DESCRIPTION
message_id

The ID of the message to check.

TYPE: str

RETURNS DESCRIPTION
bool

True if the given message ID is the maximum, False otherwise.

RAISES DESCRIPTION
NotImplementedError

This method is meant to be overridden by subclasses.

Source code in faststream/broker/push_back_watcher.py
@abstractmethod
def is_max(self, message_id: str) -> bool:
    """Check if the given message ID is the maximum.

    Args:
        message_id: The ID of the message to check.

    Returns:
        True if the given message ID is the maximum, False otherwise.

    Raises:
        NotImplementedError: This method is meant to be overridden by subclasses.

    """
    raise NotImplementedError()

remove abstractmethod #

remove(message_id: str) -> None

Remove a message.

PARAMETER DESCRIPTION
message_id

ID of the message to be removed

TYPE: str

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented

Source code in faststream/broker/push_back_watcher.py
@abstractmethod
def remove(self, message_id: str) -> None:
    """Remove a message.

    Args:
        message_id: ID of the message to be removed

    Returns:
        None

    Raises:
        NotImplementedError: If the method is not implemented

    """
    raise NotImplementedError()