多处理子进程随机接收 SIGTERM
Multiprocessing subprocesses randomly receive SIGTERMs
我在摆弄 multiprocessing
和 signal
。
我正在创建一个池,并让工作人员捕获 SIGTERM
s。
没有明显的原因,我观察到子进程随机接收 SIGTERM
s。
这是一个 MWE:
import multiprocessing as mp
import signal
import os
import time
def start_process():
print("Starting process #{}".format(os.getpid()))
def sigterm_handler(signo, _frame):
print("Process #{} received a SIGTERM".format(os.getpid()))
def worker(i):
time.sleep(1)
signal.signal(signal.SIGTERM, sigterm_handler)
while True:
with mp.Pool(initializer=start_process) as pool:
pool.map(worker, range(10))
time.sleep(2)
输出:
Starting process #7735
Starting process #7736
Starting process #7737
Starting process #7738
Starting process #7739
Starting process #7740
Starting process #7741
Starting process #7742
Job done.
Starting process #7746
Starting process #7747
Starting process #7748
Starting process #7749
Starting process #7750
Starting process #7751
Starting process #7752
Starting process #7753
Process #7748 received a SIGTERM
Process #7746 received a SIGTERM
Job done.
Starting process #7757
Starting process #7758
Starting process #7759
Starting process #7760
Starting process #7761
Starting process #7762
Starting process #7763
Starting process #7764
如您所见,这看起来不可预测。
那么,这些 SIGTERM
是从哪里来的呢?
这是正常的吗?
我能保证工人们会完成他们的工作吗?
最后,让子进程捕获 SIGTERM
s 可以吗?
我认为这很正常,但关于随机消息打印的问题我无话可说。您可以获得更多信息,将其插入主要内容:
mp.log_to_stderr(logging.DEBUG)
并更改 start_process():
def start_process():
proc= mp.current_process()
print("Starting process #{}, its name is {}".format(os.getpid(),proc.name))
这是正常的,当您的池在离开上下文管理器时正在执行 __exit__
时可能会发生。
由于那时工人们已经完成了他们的工作,所以没有什么可担心的。
池本身导致 SIGTERM
对于没有退出代码的工作人员
池检查它。这在 Pool._terminate_pool
-method (Python 3.7.1):
中触发
# Terminate workers which haven't already finished.
if pool and hasattr(pool[0], 'terminate'):
util.debug('terminating workers')
for p in pool:
if p.exitcode is None:
p.terminate()
池工作人员将在几行之后加入:
if pool and hasattr(pool[0], 'terminate'):
util.debug('joining pool workers')
for p in pool:
if p.is_alive():
# worker has not yet exited
util.debug('cleaning up worker %d' % p.pid)
p.join()
在这样的情况下,您会在您的工作人员显式调用 pool.terminate()
时
仍然是 运行(例如你正在使用 pool.map_async
然后使用 pool.terminate()
),
您的应用程序将死锁等待 p.join()
(除非您让 sigterm_handler
最终调用 sys.exit())
。
如果不需要,最好不要乱用信号处理程序。
我在摆弄 multiprocessing
和 signal
。
我正在创建一个池,并让工作人员捕获 SIGTERM
s。
没有明显的原因,我观察到子进程随机接收 SIGTERM
s。
这是一个 MWE:
import multiprocessing as mp
import signal
import os
import time
def start_process():
print("Starting process #{}".format(os.getpid()))
def sigterm_handler(signo, _frame):
print("Process #{} received a SIGTERM".format(os.getpid()))
def worker(i):
time.sleep(1)
signal.signal(signal.SIGTERM, sigterm_handler)
while True:
with mp.Pool(initializer=start_process) as pool:
pool.map(worker, range(10))
time.sleep(2)
输出:
Starting process #7735
Starting process #7736
Starting process #7737
Starting process #7738
Starting process #7739
Starting process #7740
Starting process #7741
Starting process #7742
Job done.
Starting process #7746
Starting process #7747
Starting process #7748
Starting process #7749
Starting process #7750
Starting process #7751
Starting process #7752
Starting process #7753
Process #7748 received a SIGTERM
Process #7746 received a SIGTERM
Job done.
Starting process #7757
Starting process #7758
Starting process #7759
Starting process #7760
Starting process #7761
Starting process #7762
Starting process #7763
Starting process #7764
如您所见,这看起来不可预测。
那么,这些 SIGTERM
是从哪里来的呢?
这是正常的吗?
我能保证工人们会完成他们的工作吗?
最后,让子进程捕获 SIGTERM
s 可以吗?
我认为这很正常,但关于随机消息打印的问题我无话可说。您可以获得更多信息,将其插入主要内容:
mp.log_to_stderr(logging.DEBUG)
并更改 start_process():
def start_process():
proc= mp.current_process()
print("Starting process #{}, its name is {}".format(os.getpid(),proc.name))
这是正常的,当您的池在离开上下文管理器时正在执行 __exit__
时可能会发生。
由于那时工人们已经完成了他们的工作,所以没有什么可担心的。
池本身导致 SIGTERM
对于没有退出代码的工作人员
池检查它。这在 Pool._terminate_pool
-method (Python 3.7.1):
# Terminate workers which haven't already finished.
if pool and hasattr(pool[0], 'terminate'):
util.debug('terminating workers')
for p in pool:
if p.exitcode is None:
p.terminate()
池工作人员将在几行之后加入:
if pool and hasattr(pool[0], 'terminate'):
util.debug('joining pool workers')
for p in pool:
if p.is_alive():
# worker has not yet exited
util.debug('cleaning up worker %d' % p.pid)
p.join()
在这样的情况下,您会在您的工作人员显式调用 pool.terminate()
时
仍然是 运行(例如你正在使用 pool.map_async
然后使用 pool.terminate()
),
您的应用程序将死锁等待 p.join()
(除非您让 sigterm_handler
最终调用 sys.exit())
。
如果不需要,最好不要乱用信号处理程序。