Skip to content

WatchReloader

faststream.cli.supervisors.watchfiles.WatchReloader #

WatchReloader(
    target: DecoratedCallable,
    args: Tuple[Any, ...],
    reload_dirs: Sequence[Union[Path, str]],
    reload_delay: float = 0.3,
    extra_extensions: Sequence[str] = (),
)

Bases: BaseReload

A class to reload a target function when files in specified directories change.

Initialize a WatchFilesReloader object.

PARAMETER DESCRIPTION
target

the function to be reloaded

args

arguments to be passed to the target function

reload_dirs

directories to watch for file changes

reload_delay

delay in seconds between each check for file changes

DEFAULT: 0.3

extra_extensions

A sequence of extra extensions to include.

TYPE: Sequence[str] DEFAULT: ()

RETURNS DESCRIPTION
None

None.

Source code in faststream/cli/supervisors/watchfiles.py
def __init__(
    self,
    target: DecoratedCallable,
    args: Tuple[Any, ...],
    reload_dirs: Sequence[Union[Path, str]],
    reload_delay: float = 0.3,
    extra_extensions: Sequence[str] = (),
) -> None:
    """Initialize a WatchFilesReloader object.

    Args:
        target : the function to be reloaded
        args : arguments to be passed to the target function
        reload_dirs : directories to watch for file changes
        reload_delay : delay in seconds between each check for file changes
        extra_extensions: A sequence of extra extensions to include.

    Returns:
        None.

    """
    super().__init__(target, args, reload_delay)
    self.reloader_name = "WatchFiles"
    self.reload_dirs = reload_dirs
    self.watcher = watchfiles.watch(
        *reload_dirs,
        step=int(reload_delay * 1000),
        watch_filter=ExtendedFilter(extra_extensions=extra_extensions),
        stop_event=self.should_exit,
        yield_on_timeout=True,
    )

pid instance-attribute #

pid: int = getpid()

reload_delay instance-attribute #

reload_delay: Optional[float] = reload_delay

reload_dirs instance-attribute #

reload_dirs = reload_dirs

reloader_name instance-attribute #

reloader_name = 'WatchFiles'

should_exit instance-attribute #

should_exit: Event = Event()

watcher instance-attribute #

watcher = watch(
    *reload_dirs,
    step=int(reload_delay * 1000),
    watch_filter=ExtendedFilter(
        extra_extensions=extra_extensions
    ),
    stop_event=should_exit,
    yield_on_timeout=True
)

restart #

restart() -> None
Source code in faststream/cli/supervisors/basereload.py
def restart(self) -> None:
    self._stop_process()
    logger.info("Process successfully reloaded")
    self._process = self._start_process()

run #

run() -> None
Source code in faststream/cli/supervisors/basereload.py
def run(self) -> None:
    self.startup()
    while not self.should_exit.wait(self.reload_delay):
        if self.should_restart():  # pragma: no branch
            self.restart()
    self.shutdown()

should_restart #

should_restart() -> bool
Source code in faststream/cli/supervisors/watchfiles.py
def should_restart(self) -> bool:
    for changes in self.watcher:  # pragma: no branch
        if changes:  # pragma: no branch
            unique_paths = {Path(c[1]).name for c in changes}
            message = "WatchReloader detected file change in '%s'. Reloading..."
            logger.info(message % tuple(unique_paths))
            return True
    return False  # pragma: no cover

shutdown #

shutdown() -> None
Source code in faststream/cli/supervisors/basereload.py
def shutdown(self) -> None:
    self._stop_process()
    logger.info(f"Stopping reloader process [{self.pid}]")

startup #

startup() -> None
Source code in faststream/cli/supervisors/watchfiles.py
def startup(self) -> None:
    logger.info(f"Will watch for changes in these directories: {self.reload_dirs}")
    super().startup()