Skip to content

get_watcher_context

faststream.broker.utils.get_watcher_context #

get_watcher_context(logger, no_ack, retry, **extra_options)

Create Acknowledgement scope.

Source code in faststream/broker/utils.py
def get_watcher_context(
    logger: Optional["LoggerProto"],
    no_ack: bool,
    retry: Union[bool, int],
    **extra_options: Any,
) -> Callable[..., AsyncContextManager[None]]:
    """Create Acknowledgement scope."""
    if no_ack:
        return fake_context

    else:
        return partial(
            WatcherContext,
            watcher=get_watcher(logger, retry),
            logger=logger,
            **extra_options,
        )