BaseReload(
target: DecoratedCallable,
args: Tuple[Any, ...],
reload_delay: Optional[float] = 0.5,
)
A base class for implementing a reloader process.
METHOD | DESCRIPTION |
run | Runs the reloader process. |
startup | Performs startup operations for the reloader process. |
restart | |
shutdown | Shuts down the reloader process. |
_stop_process | Stops the spawned process. |
_start_process | Starts the spawned process. |
should_restart | Determines whether the process should be restarted. |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Initialize a class instance.
PARAMETER | DESCRIPTION |
target | The target callable object TYPE: DecoratedCallable |
args | Tuple of arguments to be passed to the target callable TYPE: Tuple[Any, ...] |
reload_delay | Optional delay in seconds before reloading the target callable (default is 0.5 seconds) TYPE: Optional[float] DEFAULT: 0.5 |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/cli/supervisors/basereload.py
| def __init__(
self,
target: DecoratedCallable,
args: Tuple[Any, ...],
reload_delay: Optional[float] = 0.5,
) -> None:
"""Initialize a class instance.
Args:
target: The target callable object
args: Tuple of arguments to be passed to the target callable
reload_delay: Optional delay in seconds before reloading the target callable (default is 0.5 seconds)
Returns:
None
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self._target = target
self._args = args
self.should_exit = threading.Event()
self.pid = os.getpid()
self.reload_delay = reload_delay
set_exit(lambda *_: self.should_exit.set())
|
reload_delay instance-attribute
reload_delay: Optional[float] = reload_delay
reloader_name class-attribute
instance-attribute
should_exit instance-attribute
should_exit: threading.Event = threading.Event()
restart
Source code in faststream/cli/supervisors/basereload.py
| def restart(self) -> None:
self._stop_process()
logger.info("Process successfully reloaded")
self._process = self._start_process()
|
run
Source code in faststream/cli/supervisors/basereload.py
| def run(self) -> None:
self.startup()
while not self.should_exit.wait(self.reload_delay):
if self.should_restart(): # pragma: no branch
self.restart()
self.shutdown()
|
should_restart
Source code in faststream/cli/supervisors/basereload.py
| def should_restart(self) -> bool:
raise NotImplementedError("Reload strategies should override should_restart()")
|
shutdown
Source code in faststream/cli/supervisors/basereload.py
| def shutdown(self) -> None:
self._stop_process()
logger.info(f"Stopping reloader process [{self.pid}]")
|
startup
Source code in faststream/cli/supervisors/basereload.py
| def startup(self) -> None:
logger.info(f"Started reloader process [{self.pid}] using {self.reloader_name}")
self._process = self._start_process()
|