Skip to content

OneTryWatcher

faststream.broker.acknowledgement_watcher.OneTryWatcher #

OneTryWatcher(max_tries=0, logger=None)

Bases: BaseWatcher

A class to watch and track messages.

Source code in faststream/broker/acknowledgement_watcher.py
def __init__(
    self,
    max_tries: int = 0,
    logger: Optional["LoggerProto"] = None,
) -> None:
    self.logger = logger
    self.max_tries = max_tries

max_tries instance-attribute #

max_tries = max_tries

logger instance-attribute #

logger = logger

add #

add(message_id)

Add a message.

Source code in faststream/broker/acknowledgement_watcher.py
def add(self, message_id: str) -> None:
    """Add a message."""
    pass

is_max #

is_max(message_id)

Check if the given message ID is the maximum attempt.

Source code in faststream/broker/acknowledgement_watcher.py
def is_max(self, message_id: str) -> bool:
    """Check if the given message ID is the maximum attempt."""
    return True

remove #

remove(message_id)

Remove a message.

Source code in faststream/broker/acknowledgement_watcher.py
def remove(self, message_id: str) -> None:
    """Remove a message."""
    pass