如何在父进程死亡时终止 Python 的 `ProcessPoolExecutor`?
How to terminate Python's `ProcessPoolExecutor` when parent process dies?
如果父进程因任何原因终止,是否有办法使 concurrent.futures.ProcessPoolExecutor
中的进程终止?
一些细节:我在处理大量数据的作业中使用 ProcessPoolExecutor
。有时我需要使用 kill 命令终止父进程,但是当我这样做时,来自 ProcessPoolExecutor
的进程保持 运行 并且我也必须手动杀死它们。我的主要工作循环如下所示:
with concurrent.futures.ProcessPoolExecutor(n_workers) as executor:
result_list = [executor.submit(_do_work, data) for data in data_list]
for id, future in enumerate(
concurrent.futures.as_completed(result_list)):
print(f'{id}: {future.result()}')
有什么我可以在这里添加或做不同的事情来使 executor
中的子进程在父进程死亡时终止吗?
我建议进行两项更改:
- 使用
kill -15
命令,Python 程序可以将其作为 SIGTERM 信号处理,而不是 kill -9
命令。
- 使用使用
multiprocessing.pool.Pool
class 创建的多处理池,其 terminate
方法与 concurrent.futures.ProcessPoolExecutor
class 的工作方式完全不同,因为它将终止池中的所有进程,因此任何已提交的任务和 运行 也将立即终止。
使用新池并处理 SIGTERM 中断的等效程序为:
from multiprocessing import Pool
import signal
import sys
import os
...
def handle_sigterm(*args):
#print('Terminating...', file=sys.stderr, flush=True)
pool.terminate()
sys.exit(1)
# The process to be "killed", if necessary:
print(os.getpid(), file=sys.stderr)
pool = Pool(n_workers)
signal.signal(signal.SIGTERM, handle_sigterm)
results = pool.imap_unordered(_do_work, data_list)
for id, result in enumerate(results):
print(f'{id}: {result}')
您可以在每个进程中启动一个线程以在父进程死亡时终止:
def start_thread_to_terminate_when_parent_process_dies(ppid):
pid = os.getpid()
def f():
while True:
try:
os.kill(ppid, 0)
except OSError:
os.kill(pid, signal.SIGTERM)
time.sleep(1)
thread = threading.Thread(target=f, daemon=True)
thread.start()
用法:将 initializer
和 initargs
传递给 ProcessPoolExecutor
with concurrent.futures.ProcessPoolExecutor(
n_workers,
initializer=start_thread_to_terminate_when_parent_process_dies, # +
initargs=(os.getpid(),), # +
) as executor:
即使父进程是 SIGKILL
/kill -9
也能正常工作。
您可以 运行 kill-cgroup 中的脚本。当你需要杀死整个东西时,你可以使用 cgroup 的 kill switch 来完成。即使是 cpu-cgroup 也能达到目的,因为您可以访问该组的 pids。
检查 this article 如何使用 cgexec。
如果父进程因任何原因终止,是否有办法使 concurrent.futures.ProcessPoolExecutor
中的进程终止?
一些细节:我在处理大量数据的作业中使用 ProcessPoolExecutor
。有时我需要使用 kill 命令终止父进程,但是当我这样做时,来自 ProcessPoolExecutor
的进程保持 运行 并且我也必须手动杀死它们。我的主要工作循环如下所示:
with concurrent.futures.ProcessPoolExecutor(n_workers) as executor:
result_list = [executor.submit(_do_work, data) for data in data_list]
for id, future in enumerate(
concurrent.futures.as_completed(result_list)):
print(f'{id}: {future.result()}')
有什么我可以在这里添加或做不同的事情来使 executor
中的子进程在父进程死亡时终止吗?
我建议进行两项更改:
- 使用
kill -15
命令,Python 程序可以将其作为 SIGTERM 信号处理,而不是kill -9
命令。 - 使用使用
multiprocessing.pool.Pool
class 创建的多处理池,其terminate
方法与concurrent.futures.ProcessPoolExecutor
class 的工作方式完全不同,因为它将终止池中的所有进程,因此任何已提交的任务和 运行 也将立即终止。
使用新池并处理 SIGTERM 中断的等效程序为:
from multiprocessing import Pool
import signal
import sys
import os
...
def handle_sigterm(*args):
#print('Terminating...', file=sys.stderr, flush=True)
pool.terminate()
sys.exit(1)
# The process to be "killed", if necessary:
print(os.getpid(), file=sys.stderr)
pool = Pool(n_workers)
signal.signal(signal.SIGTERM, handle_sigterm)
results = pool.imap_unordered(_do_work, data_list)
for id, result in enumerate(results):
print(f'{id}: {result}')
您可以在每个进程中启动一个线程以在父进程死亡时终止:
def start_thread_to_terminate_when_parent_process_dies(ppid):
pid = os.getpid()
def f():
while True:
try:
os.kill(ppid, 0)
except OSError:
os.kill(pid, signal.SIGTERM)
time.sleep(1)
thread = threading.Thread(target=f, daemon=True)
thread.start()
用法:将 initializer
和 initargs
传递给 ProcessPoolExecutor
with concurrent.futures.ProcessPoolExecutor(
n_workers,
initializer=start_thread_to_terminate_when_parent_process_dies, # +
initargs=(os.getpid(),), # +
) as executor:
即使父进程是 SIGKILL
/kill -9
也能正常工作。
您可以 运行 kill-cgroup 中的脚本。当你需要杀死整个东西时,你可以使用 cgroup 的 kill switch 来完成。即使是 cpu-cgroup 也能达到目的,因为您可以访问该组的 pids。
检查 this article 如何使用 cgexec。