Python 多处理:出现第一个子错误时中止映射
Python multiprocessing: abort map on first child error
当其中一个子进程中止 and/or 抛出异常时,中止多处理的正确方法是什么?
我发现了各种各样的问题(generic multiprocessing error handling, ,...),但没有关于如何停止子异常多处理的明确答案。
例如,我期望以下代码:
def f(x):
sleep(x)
print(f"f({x})")
return 1.0 / (x - 2)
def main():
with Pool(4) as p:
try:
r = p.map(f, range(7))
except Exception as e:
print(f"oops: {e}")
p.close()
p.terminate()
print("end")
if __name__ == '__main__':
main()
要输出:
f(0)
f(1)
f(2)
oops: float division by zero
end
相反,它对 detecting/handling 异常之前的所有项目应用 f
函数:
f(0)
f(1)
f(2)
f(4)
f(3)
f(5)
f(6)
oops: float division by zero
end
没有办法直接捕获异常吗?
我认为您将需要 apply_async
为此,您可以根据每个结果而不是累积结果采取行动。 pool.apply_async
提供了一个 error_callback
参数,您可以使用它来注册您的错误处理程序。 apply_async
没有阻塞,因此您需要 join()
池。我还使用标志 terminated
来了解在没有异常发生的情况下何时可以正常处理结果。
from time import sleep
from multiprocessing import Pool
def f(x):
sleep(x)
print(f"f({x})")
return 1.0 / (x - 2)
def on_error(e):
global terminated
terminated = True
pool.terminate()
print(f"oops:{e}")
def main():
global pool
global terminated
terminated = False
pool = Pool(4)
results = [pool.apply_async(f, (x,), error_callback=on_error)
for x in range(7)]
pool.close()
pool.join()
if not terminated:
for r in results:
print(r.get())
print("end")
if __name__ == '__main__':
main()
当其中一个子进程中止 and/or 抛出异常时,中止多处理的正确方法是什么?
我发现了各种各样的问题(generic multiprocessing error handling,
例如,我期望以下代码:
def f(x):
sleep(x)
print(f"f({x})")
return 1.0 / (x - 2)
def main():
with Pool(4) as p:
try:
r = p.map(f, range(7))
except Exception as e:
print(f"oops: {e}")
p.close()
p.terminate()
print("end")
if __name__ == '__main__':
main()
要输出:
f(0)
f(1)
f(2)
oops: float division by zero
end
相反,它对 detecting/handling 异常之前的所有项目应用 f
函数:
f(0)
f(1)
f(2)
f(4)
f(3)
f(5)
f(6)
oops: float division by zero
end
没有办法直接捕获异常吗?
我认为您将需要 apply_async
为此,您可以根据每个结果而不是累积结果采取行动。 pool.apply_async
提供了一个 error_callback
参数,您可以使用它来注册您的错误处理程序。 apply_async
没有阻塞,因此您需要 join()
池。我还使用标志 terminated
来了解在没有异常发生的情况下何时可以正常处理结果。
from time import sleep
from multiprocessing import Pool
def f(x):
sleep(x)
print(f"f({x})")
return 1.0 / (x - 2)
def on_error(e):
global terminated
terminated = True
pool.terminate()
print(f"oops:{e}")
def main():
global pool
global terminated
terminated = False
pool = Pool(4)
results = [pool.apply_async(f, (x,), error_callback=on_error)
for x in range(7)]
pool.close()
pool.join()
if not terminated:
for r in results:
print(r.get())
print("end")
if __name__ == '__main__':
main()