Skip to content

CounterWatcher

faststream.broker.push_back_watcher.CounterWatcher #

CounterWatcher(
    max_tries: int = 3, logger: Optional[Logger] = None
)

Bases: BaseWatcher

A class to watch and track the count of messages.

PARAMETER DESCRIPTION
max_tries

int - maximum number of tries allowed

DEFAULT: 3

logger

Optional[Logger] - logger object for logging messages

DEFAULT: None

METHOD DESCRIPTION
add

str) -> None - adds a message to the counter

is_max

str) -> bool - checks if the count of a message has reached the maximum tries

remove

str) -> None - removes a message from the counter

Initialize the class.

PARAMETER DESCRIPTION
max_tries

maximum number of tries

TYPE: int DEFAULT: 3

logger

logger object (default: None)

TYPE: Optional[Logger] DEFAULT: None

Source code in faststream/broker/push_back_watcher.py
def __init__(
    self,
    max_tries: int = 3,
    logger: Optional[Logger] = None,
) -> None:
    """Initialize the class.

    Args:
        max_tries (int): maximum number of tries
        logger (Optional[Logger]): logger object (default: None)

    """
    super().__init__(logger=logger, max_tries=max_tries)
    self.memory = Counter()

logger instance-attribute #

logger = logger

max_tries instance-attribute #

max_tries: int = max_tries

memory instance-attribute #

memory: Counter[str] = Counter()

add #

add(message_id: str) -> None

Increments the count of a message in the memory.

PARAMETER DESCRIPTION
message_id

The ID of the message to be incremented.

TYPE: str

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/push_back_watcher.py
def add(self, message_id: str) -> None:
    """Increments the count of a message in the memory.

    Args:
        message_id: The ID of the message to be incremented.

    Returns:
        None

    """
    self.memory[message_id] += 1

is_max #

is_max(message_id: str) -> bool

Check if the number of tries for a message has exceeded the maximum allowed tries.

PARAMETER DESCRIPTION
message_id

The ID of the message

TYPE: str

RETURNS DESCRIPTION
bool

True if the number of tries has exceeded the maximum allowed tries, False otherwise

Source code in faststream/broker/push_back_watcher.py
def is_max(self, message_id: str) -> bool:
    """Check if the number of tries for a message has exceeded the maximum allowed tries.

    Args:
        message_id: The ID of the message

    Returns:
        True if the number of tries has exceeded the maximum allowed tries, False otherwise

    """
    is_max = self.memory[message_id] > self.max_tries
    if self.logger is not None:
        if is_max:
            self.logger.error(f"Already retried {self.max_tries} times. Skipped.")
        else:
            self.logger.error("Error is occurred. Pushing back to queue.")
    return is_max

remove #

remove(message: str) -> None

Remove a message from memory.

PARAMETER DESCRIPTION
message

The message to be removed.

TYPE: str

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/push_back_watcher.py
def remove(self, message: str) -> None:
    """Remove a message from memory.

    Args:
        message: The message to be removed.

    Returns:
        None

    """
    self.memory[message] = 0
    self.memory += Counter()