多处理对偶尔的故障具有鲁棒性
Multiprocessing Robust to Occasional Failures
我有 100-1000 个时间序列路径和一个相当昂贵的模拟,我想将其并行化。但是,我正在使用的库在极少数情况下会挂起,我想让它对这些问题具有鲁棒性。这是当前设置:
with Pool() as pool:
res = pool.map_async(simulation_that_occasionally_hangs, (p for p in paths))
all_costs = res.get()
我知道 get()
有一个 timeout
参数,但如果我理解正确的话,它适用于 1000 条路径的整个过程。我想做的是检查是否有任何 single 模拟花费的时间超过 5 分钟(正常路径需要 4 秒),如果是,则停止该路径并继续 get()
其余的。
编辑:
测试超时 pebble
def fibonacci(n):
if n == 0: return 0
elif n == 1: return 1
else: return fibonacci(n - 1) + fibonacci(n - 2)
def main():
with ProcessPool() as pool:
future = pool.map(fibonacci, range(40), timeout=10)
iterator = future.result()
all = []
while True:
try:
all.append(next(iterator))
except StopIteration:
break
except TimeoutError as e:
print(f'function took longer than {e.args[1]} seconds')
print(all)
错误:
RuntimeError: I/O operations still in flight while destroying Overlapped object, the process may crash
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\anaconda3\lib\multiprocessing\spawn.py", line 99, in spawn_main
new_handle = reduction.steal_handle(parent_pid, pipe_handle)
File "C:\anaconda3\lib\multiprocessing\reduction.py", line 87, in steal_handle
_winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
PermissionError: [WinError 5] Access is denied
可能最简单的方法是 运行 在单独的子进程中进行每个繁重的模拟,并由父进程监视。具体来说:
def risky_simulation(path):
...
def safe_simulation(path):
p = multiprocessing.Process(target=risky_simulation, args=(path,))
p.start()
p.join(timeout) # Your timeout here
p.kill() # or p.terminate()
# Here read and return the output of the simulation
# Can be from a file, or using some communication object
# between processes, from the `multiprocessing` module
with Pool() as pool:
res = pool.map_async(safe_simulation, paths)
all_costs = res.get()
备注:
- 如果模拟可能挂起,您可能希望在单独的进程中 运行 它(即
Process
对象不应该是一个线程),因为这取决于它是如何完成的,它可能赶上 GIL。
- 此解决方案仅将池用于直接子进程,但计算被卸载到新进程。我们还可以确保计算共享一个池,但这会导致代码更难看,所以我跳过了它。
pebble 库旨在解决此类问题。它透明地处理作业超时和失败,例如 C 库崩溃。
您可以查看 documentation 示例以了解如何使用它。它具有与 concurrent.futures
类似的界面。
我有 100-1000 个时间序列路径和一个相当昂贵的模拟,我想将其并行化。但是,我正在使用的库在极少数情况下会挂起,我想让它对这些问题具有鲁棒性。这是当前设置:
with Pool() as pool:
res = pool.map_async(simulation_that_occasionally_hangs, (p for p in paths))
all_costs = res.get()
我知道 get()
有一个 timeout
参数,但如果我理解正确的话,它适用于 1000 条路径的整个过程。我想做的是检查是否有任何 single 模拟花费的时间超过 5 分钟(正常路径需要 4 秒),如果是,则停止该路径并继续 get()
其余的。
编辑:
测试超时 pebble
def fibonacci(n):
if n == 0: return 0
elif n == 1: return 1
else: return fibonacci(n - 1) + fibonacci(n - 2)
def main():
with ProcessPool() as pool:
future = pool.map(fibonacci, range(40), timeout=10)
iterator = future.result()
all = []
while True:
try:
all.append(next(iterator))
except StopIteration:
break
except TimeoutError as e:
print(f'function took longer than {e.args[1]} seconds')
print(all)
错误:
RuntimeError: I/O operations still in flight while destroying Overlapped object, the process may crash
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\anaconda3\lib\multiprocessing\spawn.py", line 99, in spawn_main
new_handle = reduction.steal_handle(parent_pid, pipe_handle)
File "C:\anaconda3\lib\multiprocessing\reduction.py", line 87, in steal_handle
_winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
PermissionError: [WinError 5] Access is denied
可能最简单的方法是 运行 在单独的子进程中进行每个繁重的模拟,并由父进程监视。具体来说:
def risky_simulation(path):
...
def safe_simulation(path):
p = multiprocessing.Process(target=risky_simulation, args=(path,))
p.start()
p.join(timeout) # Your timeout here
p.kill() # or p.terminate()
# Here read and return the output of the simulation
# Can be from a file, or using some communication object
# between processes, from the `multiprocessing` module
with Pool() as pool:
res = pool.map_async(safe_simulation, paths)
all_costs = res.get()
备注:
- 如果模拟可能挂起,您可能希望在单独的进程中 运行 它(即
Process
对象不应该是一个线程),因为这取决于它是如何完成的,它可能赶上 GIL。 - 此解决方案仅将池用于直接子进程,但计算被卸载到新进程。我们还可以确保计算共享一个池,但这会导致代码更难看,所以我跳过了它。
pebble 库旨在解决此类问题。它透明地处理作业超时和失败,例如 C 库崩溃。
您可以查看 documentation 示例以了解如何使用它。它具有与 concurrent.futures
类似的界面。