如何正确处理多进程中的异常
How to correctly handle exceptions in multiprocessing
可以使用 Pool.map 检索工作人员的输出,但是当一个工作人员失败时,会引发异常并且无法再检索输出。因此,我的想法是将输出记录在进程同步队列中,以便检索所有成功工作人员的输出。
以下代码片段似乎有效:
from multiprocessing import Pool, Manager
from functools import partial
def f(x, queue):
if x == 4:
raise Exception("Error")
queue.put_nowait(x)
if __name__ == '__main__':
queue = Manager().Queue()
pool = Pool(2)
try:
pool.map(partial(f, queue=queue), range(6))
pool.close()
pool.join()
except:
print("An error occurred")
while not queue.empty():
print("Output => " + str(queue.get()))
但我想知道在队列轮询阶段是否会出现竞争条件。我不确定当所有工作人员都完成后,队列进程是否一定还活着。从这个角度来看,您认为我的代码正确吗?
至于“如何正确处理异常”,这是你的主要问题:
首先,在您的情况下,您将永远无法执行 pool.close
和 pool.join
。但是 pool.map
不会 return 直到所有提交的任务都 return 编辑了他们的结果或产生了异常,所以你真的不需要调用这些来确保你提交的所有任务已经完成。如果不是辅助函数 f
将结果写入队列,只要您的任何任务导致异常,您就永远无法使用 map
取回任何结果。相反,您必须 apply_async
个单独的任务并为每个任务获取 AsyncResult
个实例。
所以我想说,在您的工作函数中处理异常而不必求助于使用队列的更好方法如下。但请注意,当您使用 apply_async
时,任务是一次提交一个任务,这会导致许多共享内存访问。只有当提交的任务数量非常大时,这才会真正成为一个性能问题。在这种情况下,辅助函数最好自己处理异常并以某种方式传回错误指示以允许使用 map
或 imap
,您可以在其中指定 chunksize
.
使用队列时,请注意写入托管队列会产生相当多的开销。第二段代码展示了如何通过使用 multiprocessing.Queue
实例来减少开销,与托管队列不同,该实例不使用代理。请注意输出顺序,这不是提交任务的顺序,而是任务完成的顺序——使用队列的另一个 潜在 缺点或优点(您可以使用如果您希望按顺序完成结果,请使用 apply_async
回调函数)。即使使用您的原始代码,您也不应依赖于队列中结果的顺序。
from multiprocessing import Pool
def f(x):
if x == 4:
raise Exception("Error")
return x
if __name__ == '__main__':
pool = Pool(2)
results = [pool.apply_async(f, args=(x,)) for x in range(6)]
for x, result in enumerate(results): # result is AsyncResult instance:
try:
return_value = result.get()
except:
print(f'An error occurred for x = {x}')
else:
print(f'For x = {x} the return value is {return_value}')
打印:
For x = 0 the return value is 0
For x = 1 the return value is 1
For x = 2 the return value is 2
For x = 3 the return value is 3
An error occurred for x = 4
For x = 5 the return value is 5
OP的原始代码修改为使用multiprocessing.Queue
from multiprocessing import Pool, Queue
def init_pool(q):
global queue
queue = q
def f(x):
if x == 4:
raise Exception("Error")
queue.put_nowait(x)
if __name__ == '__main__':
queue = Queue()
pool = Pool(2, initializer=init_pool, initargs=(queue,))
try:
pool.map(f, range(6))
except:
print("An error occurred")
while not queue.empty():
print("Output => " + str(queue.get()))
打印:
An error occurred
Output => 0
Output => 2
Output => 3
Output => 1
Output => 5
可以使用 Pool.map 检索工作人员的输出,但是当一个工作人员失败时,会引发异常并且无法再检索输出。因此,我的想法是将输出记录在进程同步队列中,以便检索所有成功工作人员的输出。
以下代码片段似乎有效:
from multiprocessing import Pool, Manager
from functools import partial
def f(x, queue):
if x == 4:
raise Exception("Error")
queue.put_nowait(x)
if __name__ == '__main__':
queue = Manager().Queue()
pool = Pool(2)
try:
pool.map(partial(f, queue=queue), range(6))
pool.close()
pool.join()
except:
print("An error occurred")
while not queue.empty():
print("Output => " + str(queue.get()))
但我想知道在队列轮询阶段是否会出现竞争条件。我不确定当所有工作人员都完成后,队列进程是否一定还活着。从这个角度来看,您认为我的代码正确吗?
至于“如何正确处理异常”,这是你的主要问题:
首先,在您的情况下,您将永远无法执行 pool.close
和 pool.join
。但是 pool.map
不会 return 直到所有提交的任务都 return 编辑了他们的结果或产生了异常,所以你真的不需要调用这些来确保你提交的所有任务已经完成。如果不是辅助函数 f
将结果写入队列,只要您的任何任务导致异常,您就永远无法使用 map
取回任何结果。相反,您必须 apply_async
个单独的任务并为每个任务获取 AsyncResult
个实例。
所以我想说,在您的工作函数中处理异常而不必求助于使用队列的更好方法如下。但请注意,当您使用 apply_async
时,任务是一次提交一个任务,这会导致许多共享内存访问。只有当提交的任务数量非常大时,这才会真正成为一个性能问题。在这种情况下,辅助函数最好自己处理异常并以某种方式传回错误指示以允许使用 map
或 imap
,您可以在其中指定 chunksize
.
使用队列时,请注意写入托管队列会产生相当多的开销。第二段代码展示了如何通过使用 multiprocessing.Queue
实例来减少开销,与托管队列不同,该实例不使用代理。请注意输出顺序,这不是提交任务的顺序,而是任务完成的顺序——使用队列的另一个 潜在 缺点或优点(您可以使用如果您希望按顺序完成结果,请使用 apply_async
回调函数)。即使使用您的原始代码,您也不应依赖于队列中结果的顺序。
from multiprocessing import Pool
def f(x):
if x == 4:
raise Exception("Error")
return x
if __name__ == '__main__':
pool = Pool(2)
results = [pool.apply_async(f, args=(x,)) for x in range(6)]
for x, result in enumerate(results): # result is AsyncResult instance:
try:
return_value = result.get()
except:
print(f'An error occurred for x = {x}')
else:
print(f'For x = {x} the return value is {return_value}')
打印:
For x = 0 the return value is 0
For x = 1 the return value is 1
For x = 2 the return value is 2
For x = 3 the return value is 3
An error occurred for x = 4
For x = 5 the return value is 5
OP的原始代码修改为使用multiprocessing.Queue
from multiprocessing import Pool, Queue
def init_pool(q):
global queue
queue = q
def f(x):
if x == 4:
raise Exception("Error")
queue.put_nowait(x)
if __name__ == '__main__':
queue = Queue()
pool = Pool(2, initializer=init_pool, initargs=(queue,))
try:
pool.map(f, range(6))
except:
print("An error occurred")
while not queue.empty():
print("Output => " + str(queue.get()))
打印:
An error occurred
Output => 0
Output => 2
Output => 3
Output => 1
Output => 5