Apply a pattern to a routing key.
Source code in faststream/rabbit/test.py
| def apply_pattern(pattern: str, current: str) -> bool:
"""Apply a pattern to a routing key."""
pattern_queue = iter(pattern.split("."))
current_queue = iter(current.split("."))
pattern_symb = next(pattern_queue, None)
while pattern_symb:
if (next_symb := next(current_queue, None)) is None:
return False
elif pattern_symb == "#":
next_pattern = next(pattern_queue, None)
if next_pattern is None:
return True
if (next_symb := next(current_queue, None)) is None:
return False
while next_pattern == "*":
next_pattern = next(pattern_queue, None)
if (next_symb := next(current_queue, None)) is None:
return False
while next_symb != next_pattern:
if (next_symb := next(current_queue, None)) is None:
return False
pattern_symb = next(pattern_queue, None)
elif pattern_symb == "*" or pattern_symb == next_symb:
pattern_symb = next(pattern_queue, None)
else:
return False
return next(current_queue, None) is None
|