Python 多处理:出现第一个子错误时中止映射

Python multiprocessing: abort map on first child error

当其中一个子进程中止 and/or 抛出异常时,中止多处理的正确方法是什么?

我发现了各种各样的问题(generic multiprocessing error handling, ,...),但没有关于如何停止子异常多处理的明确答案。

例如,我期望以下代码:

def f(x):
    sleep(x)
    print(f"f({x})")
    return 1.0 / (x - 2)


def main():
    with Pool(4) as p:
        try:
            r = p.map(f, range(7))
        except Exception as e:
            print(f"oops: {e}")
            p.close()
            p.terminate()
    print("end")


if __name__ == '__main__':
    main()

要输出:

f(0)
f(1)
f(2)
oops: float division by zero
end

相反,它对 detecting/handling 异常之前的所有项目应用 f 函数:

f(0)
f(1)
f(2)
f(4)
f(3)
f(5)
f(6)
oops: float division by zero
end

没有办法直接捕获异常吗?

我认为您将需要 apply_async 为此,您可以根据每个结果而不是累积结果采取行动。 pool.apply_async 提供了一个 error_callback 参数,您可以使用它来注册您的错误处理程序。 apply_async 没有阻塞,因此您需要 join() 池。我还使用标志 terminated 来了解在没有异常发生的情况下何时可以正常处理结果。

from time import sleep
from multiprocessing import Pool

def f(x):
    sleep(x)
    print(f"f({x})")
    return 1.0 / (x - 2)

def on_error(e):
    global terminated
    terminated = True
    pool.terminate()
    print(f"oops:{e}")


def main():
    global pool
    global terminated

    terminated = False

    pool = Pool(4)
    results = [pool.apply_async(f, (x,), error_callback=on_error)
               for x in range(7)]
    pool.close()
    pool.join()

    if not terminated:
        for r in results:
            print(r.get())

    print("end")


if __name__ == '__main__':
    main()