WatchReloader(target, args, reload_dirs, reload_delay=0.3, extra_extensions=())
Bases: BaseReload
A class to reload a target function when files in specified directories change.
Source code in faststream/cli/supervisors/watchfiles.py
| def __init__(
self,
target: "DecoratedCallable",
args: Tuple[Any, ...],
reload_dirs: Sequence[Union[Path, str]],
reload_delay: float = 0.3,
extra_extensions: Sequence[str] = (),
) -> None:
super().__init__(target, args, reload_delay)
self.reloader_name = "WatchFiles"
self.reload_dirs = reload_dirs
self.watcher = watchfiles.watch(
*reload_dirs,
step=int(reload_delay * 1000),
watch_filter=ExtendedFilter(extra_extensions=extra_extensions),
stop_event=self.should_exit,
yield_on_timeout=True,
)
|
reload_delay instance-attribute
should_exit instance-attribute
reloader_name instance-attribute
reloader_name = 'WatchFiles'
reload_dirs instance-attribute
watcher instance-attribute
watcher = watch(*reload_dirs, step=int(reload_delay * 1000), watch_filter=ExtendedFilter(extra_extensions=extra_extensions), stop_event=should_exit, yield_on_timeout=True)
run
Source code in faststream/cli/supervisors/basereload.py
| def run(self) -> None:
self.startup()
while not self.should_exit.wait(self.reload_delay):
if self.should_restart(): # pragma: no branch
self.restart()
self.shutdown()
|
restart
Source code in faststream/cli/supervisors/basereload.py
| def restart(self) -> None:
self._stop_process()
logger.info("Process successfully reloaded")
self._process = self._start_process()
|
shutdown
Source code in faststream/cli/supervisors/basereload.py
| def shutdown(self) -> None:
self._stop_process()
logger.info(f"Stopping reloader process [{self.pid}]")
|
startup
Source code in faststream/cli/supervisors/watchfiles.py
| def startup(self) -> None:
logger.info(
"Will watch for changes in these directories: %s",
[str(i) for i in self.reload_dirs],
)
super().startup()
|
should_restart
Source code in faststream/cli/supervisors/watchfiles.py
| def should_restart(self) -> bool:
for changes in self.watcher: # pragma: no branch
if changes: # pragma: no branch
unique_paths = {str(Path(c[1]).name) for c in changes}
logger.info(
"WatchReloader detected file change in '%s'. Reloading...",
", ".join(unique_paths),
)
return True
return False # pragma: no cover
|