How can I prevent parallel python from quitting with "OSError: [Errno 35] Resource temporarily unavailable"?

How can I prevent parallel python from quitting with "OSError: [Errno 35] Resource temporarily unavailable"?

上下文

我正在尝试 运行 1000 次模拟,其中涉及 (1) 损坏道路网络,然后 (2) 测量由于损坏造成的交通延误。步骤 (1) 和 (2) 都涉及创建多个 "maps"。在步骤 (1) 中,我创建了 30 张损坏贴图。在步骤 (2) 中,我测量了这 30 张损坏地图中每一张的交通延迟。该函数然后 returns 30 个损坏地图上的平均交通延迟,并继续 运行 下一个模拟。设置的伪代码如下所示:

for i in range(0,1000): # for each simulation
    create 30 damage maps using parallel python
    measure the traffic delay of each damage map using parallel 
    python
    compute the average traffic delay for simulation i

由于地图彼此独立,我在每一步都使用并行 python 包。

问题 -- 错误消息

代码在第 72 次模拟(共 1000 次)前后两次抛出以下错误,并在步骤 (1) 期间停止了 运行ning,这涉及损坏桥梁。

An error has occurred during the function execution
Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/ppworker.py", line 90, in run
__result = __f(*__args)
File "<string>", line 4, in compute_damage
File "<string>", line 3, in damage_bridges
File "/Library/Python/2.7/site-packages/scipy/stats/__init__.py", line 345, in <module>
from .stats import *
File "/Library/Python/2.7/site-packages/scipy/stats/stats.py", line 171, in <module>
from . import distributions
File "/Library/Python/2.7/site-packages/scipy/stats/distributions.py", line 10, in <module>
from ._distn_infrastructure import (entropy, rv_discrete, rv_continuous,
File "/Library/Python/2.7/site-packages/scipy/stats/_distn_infrastructure.py", line 16, in <module>
from scipy.misc import doccer
File "/Library/Python/2.7/site-packages/scipy/misc/__init__.py", line 68, in <module>
from scipy.interpolate._pade import pade as _pade
File "/Library/Python/2.7/site-packages/scipy/interpolate/__init__.py", line 175, in <module>
from .interpolate import *
File "/Library/Python/2.7/site-packages/scipy/interpolate/interpolate.py", line 32, in <module>
from .interpnd import _ndim_coords_from_arrays
File "interpnd.pyx", line 1, in init scipy.interpolate.interpnd
File "/Library/Python/2.7/site-packages/scipy/spatial/__init__.py", line 95, in <module>
from .ckdtree import *
File "ckdtree.pyx", line 31, in init scipy.spatial.ckdtree
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 123, in cpu_count
with os.popen(comm) as p:
OSError: [Errno 35] Resource temporarily unavailable

系统和版本

我 运行宁 Python 2.7 在 PyCharm 虚拟环境中并行 python (pp) 1.6.5。我的电脑 运行s Mac OS High Sierra 10.13.3,内存为 8 GB 1867 MHz DDR3。

尝试修复

我了解到问题出在并行 python 包或我如何使用它,但我不知道如何解决这个问题。它被认为是 parallel python page 上的错误 -- wkerzendorf 发布在那里:

Q: I get a Socket Error/Memory Error when using jobs that use >os.system calls

A: The fix I found is using subprocess.Popen and poping the >stdout,stderr into the subprocess.PIPE. here is an example: subprocess.Popen(['ls ->rtl'],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True). That >fixed the error for me.

但是,我完全不确定在哪里进行此修改。

我还了解到问题可能出在我的系统限制上,根据这个 Ghost in the Machines blog post。但是,当我尝试重新配置最大文件数和最大用户进程时,我在终端中收到以下消息:

Could not set resource limits: 1: Operation not permitted

使用并行代码python

我正在使用的代码相当复杂(需要多个输入文件到 运行)所以恐怕我不能在这里提供一个最小的、可重现的例子。您可以下载 运行 代码版本 at this link

下面我包含了步骤 (1) 的代码,其中我使用并行 python 创建 30 个损坏贴图。工人人数为4.

ppservers = ()    #starting a super cool parallelization
# Creates jobserver with automatically detected number of workers
job_server = pp.Server(ppservers=ppservers)
print "Starting pp with", job_server.get_ncpus(), "workers"

# set up jobs
jobs = []
for i in targets:
    jobs.append(job_server.submit(compute_damage, (lnsas[i%len(lnsas)], napa_dict, targets[i], i%sets, U[i%sets][:] ), modules = ('random', 'math', ), depfuncs = (damage_bridges, )))

# get the results that have already run
bridge_array_new = []
bridge_array_internal = []
indices_array = []
bridge_array_hwy_num = []
for job in jobs:
    (index, damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges_road) = job()
    bridge_array_internal.append(damaged_bridges_internal)
    bridge_array_new.append(damaged_bridges_new)
    indices_array.append(index)
    bridge_array_hwy_num.append(num_damaged_bridges_road)

附加功能

compute_damage 函数如下所示。

def compute_damage(scenario, master_dict, index, scenario_index, U):
'''goes from ground-motion intensity map to damage map '''
#figure out component damage for each ground-motion intensity map
damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges = damage_bridges(scenario, master_dict, scenario_index, U) #e.g., [1, 89, 598] #num_bridges_out is highway bridges only
return index, damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges

damage_bridges 函数如下所示。

def damage_bridges(scenario, master_dict, scenario_index, u):
'''This function damages bridges based on the ground shaking values (demand) and the structural capacity (capacity). It returns two lists (could be empty) with damaged bridges (same thing, just different bridge numbering'''
from scipy.stats import norm
damaged_bridges_new = []
damaged_bridges_internal = []

#first, highway bridges and overpasses
beta = 0.6 #you may want to change this by removing this line and making it a dictionary lookup value 3 lines below
i = 0 # counter for bridge index
for site in master_dict.keys(): #1-1889 in Matlab indices (start at 1)
    lnSa = scenario[master_dict[site]['new_id'] - 1]
    prob_at_least_ext = norm.cdf((1/float(beta)) * (lnSa - math.log(master_dict[site]['ext_lnSa'])), 0, 1) #want to do moderate damage state instead of extensive damage state as we did here, then just change the key name here (see master_dict description)
    #U = random.uniform(0, 1)
    if u[i] <= prob_at_least_ext:
        damaged_bridges_new.append(master_dict[site]['new_id']) #1-1743
        damaged_bridges_internal.append(site) #1-1889
    i += 1 # increment bridge index

# GB ADDITION -- to use with master_dict = napa_dict, since napa_dict only has 40 bridges
num_damaged_bridges = sum([1 for i in damaged_bridges_new if i <= 1743])

return damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges

看来问题是我忽略了销毁在步骤 (1) 和 (2) 中创建的服务器 -- 一个简单的修复!我只是在每个步骤的末尾添加了 job_server.destroy()。我目前正在 运行 模拟,并且已经顺利完成 1000 次模拟中的 250 次。

为了完全清楚,步骤 (1) 的代码现在是:

ppservers = ()    #starting a super cool parallelization
# Creates jobserver with automatically detected number of workers
job_server = pp.Server(ppservers=ppservers)

# set up jobs
jobs = []
for i in targets:
    jobs.append(job_server.submit(compute_damage, (lnsas[i%len(lnsas)], napa_dict, targets[i], i%sets, U[i%sets][:] ), modules = ('random', 'math', ), depfuncs = (damage_bridges, )))

# get the results that have already run
bridge_array_new = []
bridge_array_internal = []
indices_array = []
bridge_array_hwy_num = []
for job in jobs:
    (index, damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges_road) = job()
    bridge_array_internal.append(damaged_bridges_internal)
    bridge_array_new.append(damaged_bridges_new)
    indices_array.append(index)
    bridge_array_hwy_num.append(num_damaged_bridges_road)

job_server.destroy()