def__init__(self,target:DecoratedCallable,args:Tuple[Any,...],reload_delay:Optional[float]=0.5,)->None:"""Initialize a class instance. Args: target: The target callable object args: Tuple of arguments to be passed to the target callable reload_delay: Optional delay in seconds before reloading the target callable (default is 0.5 seconds) Returns: None """self._target=targetself._args=argsself.should_exit=threading.Event()self.pid=os.getpid()self.reload_delay=reload_delayset_exit(lambda*_:self.should_exit.set())
defrun(self)->None:self.startup()whilenotself.should_exit.wait(self.reload_delay):ifself.should_restart():# pragma: no branchself.restart()self.shutdown()