多处理子进程随机接收 SIGTERM

Multiprocessing subprocesses randomly receive SIGTERMs

我在摆弄 multiprocessingsignal。 我正在创建一个池,并让工作人员捕获 SIGTERMs。 没有明显的原因,我观察到子进程随机接收 SIGTERMs。 这是一个 MWE:

import multiprocessing as mp
import signal
import os
import time

def start_process():
    print("Starting process #{}".format(os.getpid()))

def sigterm_handler(signo, _frame):
    print("Process #{} received a SIGTERM".format(os.getpid()))

def worker(i):
    time.sleep(1)

signal.signal(signal.SIGTERM, sigterm_handler)
while True:
    with mp.Pool(initializer=start_process) as pool:
        pool.map(worker, range(10))
    time.sleep(2)

输出:

Starting process #7735
Starting process #7736
Starting process #7737
Starting process #7738
Starting process #7739
Starting process #7740
Starting process #7741
Starting process #7742
Job done.
Starting process #7746
Starting process #7747
Starting process #7748
Starting process #7749
Starting process #7750
Starting process #7751
Starting process #7752
Starting process #7753
Process #7748 received a SIGTERM
Process #7746 received a SIGTERM
Job done.
Starting process #7757
Starting process #7758
Starting process #7759
Starting process #7760
Starting process #7761
Starting process #7762
Starting process #7763
Starting process #7764

如您所见,这看起来不可预测。

那么,这些 SIGTERM 是从哪里来的呢? 这是正常的吗? 我能保证工人们会完成他们的工作吗? 最后,让子进程捕获 SIGTERMs 可以吗?

我认为这很正常,但关于随机消息打印的问题我无话可说。您可以获得更多信息,将其插入主要内容:

mp.log_to_stderr(logging.DEBUG)

并更改 start_process():

def start_process():
    proc= mp.current_process()
    print("Starting process #{}, its name is {}".format(os.getpid(),proc.name))

这是正常的,当您的池在离开上下文管理器时正在执行 __exit__ 时可能会发生。 由于那时工人们已经完成了他们的工作,所以没有什么可担心的。 池本身导致 SIGTERM 对于没有退出代码的工作人员 池检查它。这在 Pool._terminate_pool-method (Python 3.7.1):

中触发
    # Terminate workers which haven't already finished.
    if pool and hasattr(pool[0], 'terminate'):
        util.debug('terminating workers')
        for p in pool:
            if p.exitcode is None:
                p.terminate()

池工作人员将在几行之后加入:

    if pool and hasattr(pool[0], 'terminate'):
        util.debug('joining pool workers')
        for p in pool:
            if p.is_alive():
                # worker has not yet exited
                util.debug('cleaning up worker %d' % p.pid)
                p.join()

在这样的情况下,您会在您的工作人员显式调用 pool.terminate() 时 仍然是 运行(例如你正在使用 pool.map_async 然后使用 pool.terminate()), 您的应用程序将死锁等待 p.join()(除非您让 sigterm_handler 最终调用 sys.exit())

如果不需要,最好不要乱用信号处理程序。