IOError: Broken pipe with multiprocessing.Pool and multiprocessing.Manager after Ctrl+C

IOError: Broken pipe with multiprocessing.Pool and multiprocessing.Manager after Ctrl+C

我使用的是 multiprocessing.Pool from python's standard library to run a bunch of workers. The workers each started subprocesses using python's subprocess 库。每个工人都有责任管理子流程并在完成时清理它们。

import multiprocessing as mp


def main():
    processes = 3
    jobs = 6
    pool = mp.Pool(processes)
    for i in range(jobs):
        args = (i,)
        pool.apply_async(worker, args)
    pool.close()
    pool.join()


def worker(i):
    # start processes
    # wait for completion
    # clean up
    time.sleep(1)


main()

当我试图在按 Ctrl+C 从命令行退出脚本时以一种正常的方式捕获 KeyboardInterrupt 时,它开始了。 注意:这个例子是我的实际程序的一个小版本,它尽力说明了我 运行 进入 的问题。我在 Whosebug 上找到了这些相关帖子:

前者比后者更适用。通过调查,我发现在命令行上按 Ctrl+C 时,signal.SIGINT 会发送到所有进程(父进程或主进程以及所有子进程和子进程)。仅供参考,我在 bash 终端上使用 Ubuntu 18.04。

我采用了建议的方法并忽略了子进程的中断信号。为了方便和我自己的理智,我给自己写了一个上下文管理器。

import multiprocessing as mp
import contextlib # <---
import signal # <---


def main():
    processes = 3
    jobs = 6
    with ignore_interrupt_signals(): # <--- # <---
        pool = mp.Pool(processes)

    for i in range(jobs):
        args = (i,)
        pool.apply_async(worker, args)
    pool.close()
    pool.join()


@contextlib.contextmanager
def ignore_interrupt_signals(): # <---
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)


def worker(i):
    # start processes
    # wait for completion
    # clean up
    time.sleep(1)


main()

除了 worker 需要一种方法来知道它应该关闭并且 重要地清理所有它产生的子进程 之外,这工作得很好。对于上下文,每个工作人员大约需要 45 分钟才能完成。

我认为最好的方法是通过代理使用 multiprocessing.Event. The event would be called stop_event and would be set if any process received a signal.SIGINT. To use a multiprocessing.Event that is used by the main and child processes it had to be managed by an multiprocessing.Manager

import multiprocessing as mp
import contextlib
import signal
import sys


def main():
    processes = 3
    jobs = 6

    with ignore_interrupt_signals():
        pool = mp.Pool(processes)

    manager = mp.Manager() # <---
    stop_event = manager.Event() # <---

    try:
        for i in range(jobs):
            args = (i,)
            pool.apply_async(worker, args)
    except KeyboardInterrupt: # <---
        stop_event.set() # <---
        pool.close()
        pool.join()
        sys.exit() # <---


@contextlib.contextmanager
def ignore_interrupt_signals():
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)


def worker(i, stop_event):
    # start processes
    while not stop_event.set(): # <---
        # wait for completion
        time.sleep(1)
    # clean up


main()

现在,工作人员可以安全地避免被打断,也不会清理他们的子进程,并且主进程会在池创建后捕获 KeyboardInterrupt。当捕获到异常时,池将关闭并加入进程。我认为这会起作用。但是,我从 stop_event.set() 调用中得到了一个 IOError。

Termination started due to Ctrl+C, cleaning up...
Traceback (most recent call last):
  File "...", line 1109, in <module>
    main()
  File "...", line 42, in main
    args.func(args)
  File "...", line 189, in run_command
    stop_event.set()
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 1011, in set
    return self._callmethod('set')
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

回溯删除了许多其他回溯,但感兴趣的回溯是尝试使用管理器代理设置 stop_event 时损坏的管道。

multiprocessing.Manager is started as a server and that server is started as another process. Therefore, the manager process also receives the signal.SIGINT from the Ctrl+C and terminates. This causes the pipes used by the stop_event manager proxy to go down ungracefully. The best way I found to avoid this problem is to start the manager with signal.SIGINT也忽略了。

import multiprocessing as mp
import contextlib
import signal
import sys


def main():
    processes = 3
    jobs = 6

    with ignore_interrupt_signals():
        pool = mp.Pool(processes)
        manager = mp.Manager() # <---

    stop_event = manager.Event()

    try:
        for i in range(jobs):
            args = (i,)
            pool.apply_async(worker, args)
    except KeyboardInterrupt:
        stop_event.set()
        pool.close()
        pool.join()
        sys.exit()

@contextlib.contextmanager
def ignore_interrupt_signals():
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)


def worker(i, stop_event):
    # start processes
    while not stop_event.set():
        # wait for completion
        time.sleep(1)
    # clean up


main()

我回答了我自己的问题,因为我花了很多时间在我的代码库中跟踪套接字和管道错误以确定这个相当简单的解决方案,因为我希望这对其他人有帮助。