如何正确处理多进程中的异常

How to correctly handle exceptions in multiprocessing

可以使用 Pool.map 检索工作人员的输出,但是当一个工作人员失败时,会引发异常并且无法再检索输出。因此,我的想法是将输出记录在进程同步队列中,以便检索所有成功工作人员的输出。

以下代码片段似乎有效:

from multiprocessing import Pool, Manager
from functools import partial

def f(x, queue):
    if x == 4:
        raise Exception("Error")

    queue.put_nowait(x)

if __name__ == '__main__':
    queue = Manager().Queue()
    pool = Pool(2)

    try:
        pool.map(partial(f, queue=queue), range(6))
        pool.close()
        pool.join()
    except:
        print("An error occurred")

    while not queue.empty():
        print("Output => " + str(queue.get()))

但我想知道在队列轮询阶段是否会出现竞争条件。我不确定当所有工作人员都完成后,队列进程是否一定还活着。从这个角度来看,您认为我的代码正确吗?

至于“如何正确处理异常”,这是你的主要问题:

首先,在您的情况下,您将永远无法执行 pool.closepool.join。但是 pool.map 不会 return 直到所有提交的任务都 return 编辑了他们的结果或产生了异常,所以你真的不需要调用这些来确保你提交的所有任务已经完成。如果不是辅助函数 f 将结果写入队列,只要您的任何任务导致异常,您就永远无法使用 map 取回任何结果。相反,您必须 apply_async 个单独的任务并为每个任务获取 AsyncResult 个实例。

所以我想说,在您的工作函数中处理异常而不必求助于使用队列的更好方法如下。但请注意,当您使用 apply_async 时,任务是一次提交一个任务,这会导致许多共享内存访问。只有当提交的任务数量非常大时,这才会真正成为一个性能问题。在这种情况下,辅助函数最好自己处理异常并以某种方式传回错误指示以允许使用 mapimap,您可以在其中指定 chunksize .

使用队列时,请注意写入托管队列会产生相当多的开销。第二段代码展示了如何通过使用 multiprocessing.Queue 实例来减少开销,与托管队列不同,该实例不使用代理。请注意输出顺序,这不是提交任务的顺序,而是任务完成的顺序——使用队列的另一个 潜在 缺点或优点(您可以使用如果您希望按顺序完成结果,请使用 apply_async 回调函数)。即使使用您的原始代码,您也不应依赖于队列中结果的顺序。

from multiprocessing import Pool

def f(x):
    if x == 4:
        raise Exception("Error")

    return x

if __name__ == '__main__':
    pool = Pool(2)
    results = [pool.apply_async(f, args=(x,)) for x in range(6)]
    for x, result in enumerate(results): # result is AsyncResult instance:
        try:
            return_value = result.get()
        except:
            print(f'An error occurred for x = {x}')
        else:
            print(f'For x = {x} the return value is {return_value}')

打印:

For x = 0 the return value is 0
For x = 1 the return value is 1
For x = 2 the return value is 2
For x = 3 the return value is 3
An error occurred for x = 4
For x = 5 the return value is 5

OP的原始代码修改为使用multiprocessing.Queue

from multiprocessing import Pool, Queue


def init_pool(q):
    global queue
    queue = q

def f(x):
    if x == 4:
        raise Exception("Error")

    queue.put_nowait(x)

if __name__ == '__main__':
    queue = Queue()
    pool = Pool(2, initializer=init_pool, initargs=(queue,))

    try:
        pool.map(f, range(6))
    except:
        print("An error occurred")

    while not queue.empty():
        print("Output => " + str(queue.get()))

打印:

An error occurred
Output => 0
Output => 2
Output => 3
Output => 1
Output => 5