我怎样才能停止脚本而不必等待为间隔设置的时间过去?

How could I stop the script without having to wait for the time set for interval to pass?

在这个脚本中,我希望启动一个给定的程序并在该程序存在时对其进行监视。因此,我达到了使用线程模块 Timer 方法来控制循环的地步,该循环写入文件并向控制台打印出启动进程的特定统计信息(对于本例,mspaint)。

当我在控制台中按 CTRL + C 或关闭 mspaint 时,问题出现了,脚本仅在为间隔定义的时间完全 运行 后捕获 2 个事件中的任何一个.这些事件使脚本停止。

例如,如果为间隔设置了 20 秒的时间,一旦脚本开始,如果在第 5 秒我按下 CTRL + C 或关闭 mspaint,脚本将仅在剩余的 15 秒后停止已经过去了。

我希望当我按下 CTRL + C 或关闭 mspaint(或通过此脚本启动的任何其他进程)时脚本立即停止。

根据示例,脚本可以与以下命令一起使用: python.exe mon_tool.py -p "C:\Windows\System32\mspaint.exe" -i 20

如果你能想出一个可行的例子,我将不胜感激。

我用过 python 3.10.4 和 psutil 5.9.0 .

这是代码:

# mon_tool.py

import psutil, sys, os, argparse
from subprocess import Popen
from threading import Timer
debug = False


def parse_args(args):   
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--path", type=str, required=True)
    parser.add_argument("-i", "--interval", type=float, required=True)
    return parser.parse_args(args)

def exceptionHandler(exception_type, exception, traceback, debug_hook=sys.excepthook):
    '''Print user friendly error messages normally, full traceback if DEBUG on.
       Adapted from 
    '''
    if debug:
        print('\n*** Error:')
        debug_hook(exception_type, exception, traceback)
    else:
        print("%s: %s" % (exception_type.__name__, exception))
sys.excepthook = exceptionHandler
      
def validate(data):
    try:
        if data.interval < 0:            
            raise ValueError
    except ValueError:        
        raise ValueError(f"Time has a negative value: {data.interval}. Please use a positive value")
def main():
    args = parse_args(sys.argv[1:])
    validate(args)


    # creates the "Process monitor data" folder in the "Documents" folder
    # of the current Windows profile
    default_path: str = f"{os.path.expanduser('~')}\Documents\Process monitor data"
    if not os.path.exists(default_path):
        os.makedirs(default_path)  

    abs_path: str = f'{default_path}\data_test.txt'

    print("data_test.txt can be found in: " + default_path)


    # launches the provided process for the path argument, and
    # it checks if the process was indeed launched
    p: Popen[bytes] = Popen(args.path)
    PID = p.pid    
    isProcess: bool = True
    while isProcess:
        for proc in psutil.process_iter():
            if(proc.pid == PID):
                isProcess = False

    process_stats = psutil.Process(PID)

    # creates the data_test.txt and it erases its content
    with open(abs_path, 'w', newline='', encoding='utf-8') as testfile:
            testfile.write("")
             

    # loop for writing the handles count to data_test.txt, and
    # for printing out the handles count to the console
    def process_monitor_loop():      
        with open(abs_path, 'a', newline='', encoding='utf-8') as testfile:
            testfile.write(f"{process_stats.num_handles()}\n")
            print(process_stats.num_handles())
        Timer(args.interval, process_monitor_loop).start() 
    process_monitor_loop()
                      

if __name__ == '__main__':
    main()

谢谢!

您可以尝试为 SIGINT 注册一个信号处理程序,这样每当用户按下 Ctrl+C 时,您就可以拥有一个自定义处理程序来清除所有依赖项,例如间隔,然后优雅地退出。 请参阅 this 了解简单的实现。

我认为您可以使用 python-worker (link) 作为备选方案

import time
from datetime import datetime
from worker import worker, enableKeyboardInterrupt

# make sure to execute this before running the worker to enable keyboard interrupt
enableKeyboardInterrupt()


# your codes
...


# block lines with periodic check
def block_next_lines(duration):
    t0 = time.time()
    while time.time() - t0 <= duration:
        time.sleep(0.05) # to reduce resource consumption


def main():
    # your codes
    ...

    @worker(keyboard_interrupt=True)
    def process_monitor_loop():
        while True:
            print("hii", datetime.now().isoformat())
            block_next_lines(3)
    
    return process_monitor_loop()

if __name__ == '__main__':
    main_worker = main()
    main_worker.wait()

此处您的 process_monitor_loop 将能够停止,即使它不是恰好 20 秒的间隔

这是问题第二部分的解决方案,检查启动的进程是否存在。如果它不存在,它会停止脚本。

此解决方案位于解决方案之上,针对问题的第一部分,上面由 @danangjoyoo 提供,它处理在使用 CTRL + C 时停止脚本。

再次感谢您,@danangjoyoo! :)

这是问题第二部分的代码:

import time, psutil, sys, os
from datetime import datetime
from worker import worker, enableKeyboardInterrupt, abort_all_thread, ThreadWorkerManager
from threading import Timer

# make sure to execute this before running the worker to enable keyboard interrupt
enableKeyboardInterrupt()


# block lines with periodic check
def block_next_lines(duration):
    t0 = time.time()
    while time.time() - t0 <= duration:
        time.sleep(0.05) # to reduce resource consumption


def main():

    # launches mspaint, gets its PID and checks if it was indeed launched
    path = f"C:\Windows\System32\mspaint.exe"
    p = psutil.Popen(path)
    PID = p.pid    
    isProcess: bool = True
    while isProcess:
        for proc in psutil.process_iter():
            if(proc.pid == PID):
                isProcess = False

    interval = 5
    global counter
    counter = 0

    #allows for sub_process to run only once
    global run_sub_process_once
    run_sub_process_once = 1

    @worker(keyboard_interrupt=True)
    def process_monitor_loop():
        while True:
            print("hii", datetime.now().isoformat())


            def sub_proccess():
                '''
                Checks every second if the launched process still exists.
                If the process doesn't exist anymore, the script will be stopped.
                '''

                print("Process online:", psutil.pid_exists(PID))
                t = Timer(1, sub_proccess)
                t.start()
                global counter
                counter += 1 
                print(counter)

                # Checks if the worker thread is alive.
                # If it is not alive, it will kill the thread spawned by sub_process
                # hence, stopping the script.
                for _, key in enumerate(ThreadWorkerManager.allWorkers):
                    w = ThreadWorkerManager.allWorkers[key]
                    if not w.is_alive:
                        t.cancel()

                if not psutil.pid_exists(PID):
                    abort_all_thread()
                    t.cancel()

            global run_sub_process_once
            if run_sub_process_once:
                run_sub_process_once = 0
                sub_proccess()

            block_next_lines(interval)
    
    return process_monitor_loop()

if __name__ == '__main__':
    main_worker = main()
    main_worker.wait()

此外,我必须指出 @danangjoyoo 的解决方案是 Windows 的 signal.pause() 的替代方案。这只涉及 CTRL + C 问题部分。 signal.pause() 仅适用于 Unix 系统。如果是 Unix 系统,就我而言,这就是它应该如何使用:

import signal, sys
from threading import Timer
 
def main():
    def signal_handler(sig, frame):
        print('\nYou pressed Ctrl+C!')
        sys.exit(0)
 
    signal.signal(signal.SIGINT, signal_handler)
    print('Press Ctrl+C')
    def process_monitor_loop():      
        try:
            print("hi")
        except KeyboardInterrupt:    
            signal.pause()
        Timer(10, process_monitor_loop).start() 
    process_monitor_loop()
 
 
if __name__ == '__main__':
    main()

以上代码基于this.