Skip to content

CounterWatcher

faststream.broker.acknowledgement_watcher.CounterWatcher #

CounterWatcher(max_tries=3, logger=None)

Bases: BaseWatcher

A class to watch and track the count of messages.

Source code in faststream/broker/acknowledgement_watcher.py
def __init__(
    self,
    max_tries: int = 3,
    logger: Optional["LoggerProto"] = None,
) -> None:
    super().__init__(logger=logger, max_tries=max_tries)
    self.memory = Counter()

max_tries instance-attribute #

max_tries = max_tries

logger instance-attribute #

logger = logger

memory instance-attribute #

memory = Counter()

add #

add(message_id)

Check if the given message ID is the maximum attempt.

Source code in faststream/broker/acknowledgement_watcher.py
def add(self, message_id: str) -> None:
    """Check if the given message ID is the maximum attempt."""
    self.memory[message_id] += 1

is_max #

is_max(message_id)

Check if the number of tries for a message has exceeded the maximum allowed tries.

Source code in faststream/broker/acknowledgement_watcher.py
def is_max(self, message_id: str) -> bool:
    """Check if the number of tries for a message has exceeded the maximum allowed tries."""
    is_max = self.memory[message_id] > self.max_tries
    if self.logger is not None:
        if is_max:
            self.logger.log(
                logging.ERROR, f"Already retried {self.max_tries} times. Skipped."
            )
        else:
            self.logger.log(
                logging.ERROR, "Error is occurred. Pushing back to queue."
            )
    return is_max

remove #

remove(message_id)

Remove a message from memory.

Source code in faststream/broker/acknowledgement_watcher.py
def remove(self, message_id: str) -> None:
    """Remove a message from memory."""
    self.memory[message_id] = 0
    self.memory += Counter()