Skip to content

BaseWatcher

faststream.broker.acknowledgement_watcher.BaseWatcher #

BaseWatcher(max_tries=0, logger=None)

Bases: ABC

A base class for a watcher.

Source code in faststream/broker/acknowledgement_watcher.py
def __init__(
    self,
    max_tries: int = 0,
    logger: Optional["LoggerProto"] = None,
) -> None:
    self.logger = logger
    self.max_tries = max_tries

logger instance-attribute #

logger = logger

max_tries instance-attribute #

max_tries = max_tries

add abstractmethod #

add(message_id)

Add a message.

Source code in faststream/broker/acknowledgement_watcher.py
@abstractmethod
def add(self, message_id: str) -> None:
    """Add a message."""
    raise NotImplementedError()

is_max abstractmethod #

is_max(message_id)

Check if the given message ID is the maximum attempt.

Source code in faststream/broker/acknowledgement_watcher.py
@abstractmethod
def is_max(self, message_id: str) -> bool:
    """Check if the given message ID is the maximum attempt."""
    raise NotImplementedError()

remove abstractmethod #

remove(message_id)

Remove a message.

Source code in faststream/broker/acknowledgement_watcher.py
@abstractmethod
def remove(self, message_id: str) -> None:
    """Remove a message."""
    raise NotImplementedError()