Bases: BaseWatcher
A class to watch and track the count of messages.
PARAMETER | DESCRIPTION |
max_tries | int - maximum number of tries allowed DEFAULT: 3 |
logger | Optional[Logger] - logger object for logging messages DEFAULT: None |
METHOD | DESCRIPTION |
add | str) -> None - adds a message to the counter |
is_max | str) -> bool - checks if the count of a message has reached the maximum tries |
remove | str) -> None - removes a message from the counter |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Initialize the class.
PARAMETER | DESCRIPTION |
max_tries | TYPE: int DEFAULT: 3 |
logger | logger object (default: None) TYPE: Optional[Logger] DEFAULT: None |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/push_back_watcher.py
| def __init__(
self,
max_tries: int = 3,
logger: Optional[Logger] = None,
):
"""Initialize the class.
Args:
max_tries (int): maximum number of tries
logger (Optional[Logger]): logger object (default: None)
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
super().__init__(logger=logger, max_tries=max_tries)
self.memory = Counter()
|
logger instance-attribute
max_tries instance-attribute
max_tries: int = max_tries
memory instance-attribute
memory: CounterType[str] = Counter()
add
add(message_id: str) -> None
Increments the count of a message in the memory.
PARAMETER | DESCRIPTION |
message_id | The ID of the message to be incremented. TYPE: str |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/push_back_watcher.py
| def add(self, message_id: str) -> None:
"""Increments the count of a message in the memory.
Args:
message_id: The ID of the message to be incremented.
Returns:
None
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self.memory[message_id] += 1
|
is_max
is_max(message_id: str) -> bool
Check if the number of tries for a message has exceeded the maximum allowed tries.
PARAMETER | DESCRIPTION |
message_id | TYPE: str |
RETURNS | DESCRIPTION |
bool | True if the number of tries has exceeded the maximum allowed tries, False otherwise |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/push_back_watcher.py
| def is_max(self, message_id: str) -> bool:
"""Check if the number of tries for a message has exceeded the maximum allowed tries.
Args:
message_id: The ID of the message
Returns:
True if the number of tries has exceeded the maximum allowed tries, False otherwise
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
is_max = self.memory[message_id] > self.max_tries
if self.logger is not None:
if is_max:
self.logger.error(f"Already retried {self.max_tries} times. Skipped.")
else:
self.logger.error("Error is occured. Pushing back to queue.")
return is_max
|
remove
remove(message: str) -> None
Remove a message from memory.
PARAMETER | DESCRIPTION |
message | The message to be removed. TYPE: str |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/push_back_watcher.py
| def remove(self, message: str) -> None:
"""Remove a message from memory.
Args:
message: The message to be removed.
Returns:
None
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self.memory[message] = 0
self.memory += Counter()
|