如何处理 SIGTERM 并且仍然可以正常工作 process.terminate()

How to process SIGTERM and still have a working process.terminate()

我正在尝试找到一种方法来很好地处理 SIGTERM,并在主进程收到 SIGTERM 时让我的子进程终止。

基本上,我是手动创建流程(但我相信问题与 mp.pool 相同)

import multiprocessing as mp

...

workers = [
    mp.Process(
        target=worker,
        args=(...,)
    ) for _ in range(nb_workers)
]

我正在捕捉信号

signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGABRT, term)

当捕获到信号时,我想终止所有子进程并退出。我不想等他们完成 运行ning 因为他们个人的 运行 时间可能会很长(理解几分钟)。

同样,我无法真正设置 threading.Event() 所有进程都会定期查看,因为它们基本上只是在做一个巨大但缓慢的操作(取决于一些库)。

我的想法是在捕获到信号时设置一个标志,然后让看门狗在设置标志时终止所有子进程。但是使用 .terminate() 也会使用 SIGTERM,它会再次被我的信号处理程序捕获。

例如,简化代码:

import multiprocessing as mp
import signal
import time

FLAG = False


def f(x):
    time.sleep(5)
    print(x)
    return x * x


def term(signum, frame):
    print(f'Received Signal {signum}')
    global FLAG
    FLAG = True


def terminate(w):
    for process in w:
        print('Terminating worker {}'.format(process.pid))
        process.terminate()
        process.join()
        process.close()


signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGABRT, term)


if __name__ == '__main__':
    workers = [
        mp.Process(
            target=f,
            args=(i,)
        ) for i in range(4)
    ]
    for process in workers:
        process.start()
    while not FLAG:
        time.sleep(0.1)
    print('flag set')
    terminate(workers)
    print('Done')

如果我在进程完成之前中断代码(使用 ctrl-c):

Received Signal 2
Received Signal 2
Received Signal 2
Received Signal 2
Received Signal 2

flag set
Terminating worker 27742
Received Signal 15
0
Terminating worker 27743
Received Signal 15
1
3
2
Terminating worker 27744
Terminating worker 27745
Done

如您所见,似乎 .terminate() 不会终止子进程,因为它们保持 运行 结束,而且看起来我们也捕获了生成的 SIGTERM (15)。

到目前为止,我的解决方案是:

有什么干净的方法来处理这个问题吗?

解决方案在很大程度上取决于您 运行 使用的平台,因为 Python 标记为 [multiprocessing] 的问题通常就是这种情况,因此也应该有一个也用特定平台标记此类问题,例如[linux]。我推断您的平台不是 Windows,因为没有为该平台定义 signal.SIGQUIT。所以我会选择 Linux.

  1. 对于 Linux 你根本不希望你的子进程处理信号(例如,它们在 Ctrl-C 中断上调用函数 term 有点荒谬) .然而,对于 Windows,您希望您的子进程忽略这些中断。这意味着您希望您的主进程在 创建子进程后仅 调用 signal
  2. 与其使用 FLAG 指示主进程应该终止并且必须让主进程循环定期测试该值,不如让主进程等待更简单、更干净、更高效一个 threading.Event 实例,done_event。虽然。出于某种原因,这似乎不适用于 Windows;主进程 wait 调用不会立即得到满足。
  3. 如果您的进程正常完成并且已触发信号,您希望一些规定能够正常终止。实现包括此在内的所有目标的最简单方法是使子进程守护进程在主进程终止时终止。然后创建一个守护线程,它只是等待子进程正常终止并在发生时设置 done_event 。因此,主进程将在 某种中断或正常完成时调用 done_event.wait() 失败。现在只需正常结束即可;无需针对子流程调用 terminate,因为它们将在主流程结束时结束。
import multiprocessing as mp
from threading import Thread, Event
import signal
import time
import sys


IS_WINDOWS = sys.platform == 'win32'

def f(x):
    if IS_WINDOWS:
        signal.signal(signal.SIGTERM, signal.SIG_IGN)
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        signal.signal(signal.SIGABRT, signal.SIG_IGN)

    time.sleep(5)
    print(x)
    return x * x

def term(signum, frame):
    print(f'Received Signal {signum}')
    if IS_WINDOWS:
        globals()['FLAG'] = True
    else:
        done_event.set()

def process_wait_thread():
    """
    wait for processes to finish normally and set done_event
    """
    for process in workers:
        process.join()

    if IS_WINDOWS:
        globals()['FLAG'] = True
    else:
        done_event.set()

if __name__ == '__main__':

    if IS_WINDOWS:
        globals()['FLAG'] = False
    else:
        done_event = Event()

    workers = [
        mp.Process(
            target=f,
            args=(i,),
            daemon=True
        ) for i in range(4)
    ]
    for process in workers:
        process.start()

    # We don't want subprocesses to inherit these so
    # call signal after we start the processes:
    signal.signal(signal.SIGTERM, term)
    signal.signal(signal.SIGINT, term)
    if not IS_WINDOWS:
        signal.signal(signal.SIGQUIT, term) # Not supported by Windows at all
    signal.signal(signal.SIGABRT, term)

    Thread(target=process_wait_thread, daemon=True).start()

    if IS_WINDOWS:
        while not globals()['FLAG']:
            time.sleep(0.1)
    else:
        done_event.wait()

    print('Done')