Skip to content

MultiLock

faststream.broker.utils.MultiLock #

MultiLock()

A class representing a multi lock.

Initialize a new instance of the class.

Source code in faststream/broker/utils.py
def __init__(self) -> None:
    """Initialize a new instance of the class."""
    self.queue: asyncio.Queue[None] = asyncio.Queue()

queue instance-attribute #

queue = Queue()

qsize property #

qsize

Return the size of the queue.

empty property #

empty

Return whether the queue is empty.

acquire #

acquire()

Acquire lock.

Source code in faststream/broker/utils.py
def acquire(self) -> None:
    """Acquire lock."""
    self.queue.put_nowait(None)

release #

release()

Release lock.

Source code in faststream/broker/utils.py
def release(self) -> None:
    """Release lock."""
    with suppress(asyncio.QueueEmpty, ValueError):
        self.queue.get_nowait()
        self.queue.task_done()

wait_release async #

wait_release(timeout=None)

Wait for the queue to be released.

Using for graceful shutdown.

Source code in faststream/broker/utils.py
async def wait_release(self, timeout: Optional[float] = None) -> None:
    """Wait for the queue to be released.

    Using for graceful shutdown.
    """
    if timeout:
        with anyio.move_on_after(timeout):
            await self.queue.join()