同时并行读取 numpy 数组
Concurrently reading numpy arrays in parallel
考虑以下几点:
fine = np.random.uniform(0,100,10)
fine[fine<20] = 0 # introduce some intermittency
coarse = np.sum(fine.reshape(-1,2),axis=1)
fine
是量级的时间序列(例如降雨量)。 coarse
是相同的时间序列,但分辨率减半,因此 fine
中的每 2 个时间步都聚合为 coarse
中的单个值。
然后我对确定 coarse
大小比例的权重感兴趣,该权重对应于 fine
中的每个时间步长,其中 coarse
的值高于零。
def w_xx(fine, coarse):
weights = []
for i, val in enumerate(coarse):
if val > 0:
w = fine[i*2:i*2+2]/val # returns both w1 and w2, w1 is 1st element, w2 = 1-w1 is second
weights.append(w)
return np.asarray(weights)
所以 w_xx(fine,coarse)
会 return 一个形状为 5,2
的数组,其中 axis=1
的元素是 fine
的权重 coarse
.
这对于较小的时间序列来说都很好,但我运行在 ~60k 大小的 fine
数组上进行此分析,外加 300 多次迭代的循环。
我一直在尝试使用 Python2.7 中的 multiprocessing
库并行制作此 运行,但我没有成功。我需要同时读取两个时间序列,以便为 coarse
中的每个值获取相应的 fine
值,并且仅适用于大于 0 的值,这是我的分析所需要的.
如果能提供更好的方法,我将不胜感激。我想如果我可以定义一个映射函数来与 multiprocessing
中的 Pool.map
一起使用,我应该能够并行化它吗?我才刚刚开始 multiprocessing
所以我不知道是否有其他方法?
谢谢。
您只需执行以下操作即可以矢量化形式获得相同的结果:
>>> (fine / np.repeat(coarse, 2)).reshape(-1, 2)
然后您可以使用 np.isfinite
过滤掉 coarse
为零的行,因为如果 coarse
为零,则输出为 inf
或 nan
.
太棒了!我不知道np.repeat
,非常感谢。
为了以提出的形式回答我最初的问题,我还设法通过 multiprocessing
:
完成了这项工作
import numpy as np
from multiprocessing import Pool
fine = np.random.uniform(0,100,100000)
fine[fine<20] = 0
coarse = np.sum(fine.reshape(-1,2),axis=1)
def wfunc(zipped):
return zipped[0]/zipped[1]
def wpar(zipped, processes):
p = Pool(processes)
calc = np.asarray(p.map(wfunc, zip(fine,np.repeat(coarse,2))))
p.close()
p.join()
return calc[np.isfinite(calc)].reshape(-1,2)
不过@behzad.nouri的建议显然更好:
def w_opt(fine, coarse):
w = (fine / np.repeat(coarse, 2))
return w[np.isfinite(w)].reshape(-1,2)
#using some iPython magic
%timeit w_opt(fine,coarse)
1000 loops, best of 3: 1.88 ms per loop
%timeit w_xx(fine,coarse)
1 loops, best of 3: 342 ms per loop
%timeit wpar(zip(fine,np.repeat(coarse,2)),6) #I've 6 cores at my disposal
1 loops, best of 3: 1.76 s per loop
再次感谢!
除了@behzad.nouri 提出的 NumPy 表达式之外,您还可以使用 Pythran 编译器获得额外的加速:
$ cat w_xx.py
#pythran export w_xx(float[], float[])
import numpy as np
def w_xx(fine, coarse):
w = (fine / np.repeat(coarse, 2))
return w[np.isfinite(w)].reshape(-1, 2)
$ python -m timeit -s 'import numpy as np; fine = np.random.uniform(0, 100, 100000); fine[fine<20] = 0; coarse = np.sum(fine.reshape(-1, 2), axis=1); from w_xx import w_xx' 'w_xx(fine, coarse)'
1000 loops, best of 3: 1.5 msec per loop
$ pythran w_xx.py -fopenmp -march=native # yes, this generates parallel code
$ python -m timeit -s 'import numpy as np; fine = np.random.uniform(0, 100, 100000); fine[fine<20] = 0; coarse = np.sum(fine.reshape(-1, 2), axis=1); from w_xx import w_xx' 'w_xx(fine, coarse)'
1000 loops, best of 3: 867 usec per loop
免责声明:我是 Pythran 开发人员。
考虑以下几点:
fine = np.random.uniform(0,100,10)
fine[fine<20] = 0 # introduce some intermittency
coarse = np.sum(fine.reshape(-1,2),axis=1)
fine
是量级的时间序列(例如降雨量)。 coarse
是相同的时间序列,但分辨率减半,因此 fine
中的每 2 个时间步都聚合为 coarse
中的单个值。
然后我对确定 coarse
大小比例的权重感兴趣,该权重对应于 fine
中的每个时间步长,其中 coarse
的值高于零。
def w_xx(fine, coarse):
weights = []
for i, val in enumerate(coarse):
if val > 0:
w = fine[i*2:i*2+2]/val # returns both w1 and w2, w1 is 1st element, w2 = 1-w1 is second
weights.append(w)
return np.asarray(weights)
所以 w_xx(fine,coarse)
会 return 一个形状为 5,2
的数组,其中 axis=1
的元素是 fine
的权重 coarse
.
这对于较小的时间序列来说都很好,但我运行在 ~60k 大小的 fine
数组上进行此分析,外加 300 多次迭代的循环。
我一直在尝试使用 Python2.7 中的 multiprocessing
库并行制作此 运行,但我没有成功。我需要同时读取两个时间序列,以便为 coarse
中的每个值获取相应的 fine
值,并且仅适用于大于 0 的值,这是我的分析所需要的.
如果能提供更好的方法,我将不胜感激。我想如果我可以定义一个映射函数来与 multiprocessing
中的 Pool.map
一起使用,我应该能够并行化它吗?我才刚刚开始 multiprocessing
所以我不知道是否有其他方法?
谢谢。
您只需执行以下操作即可以矢量化形式获得相同的结果:
>>> (fine / np.repeat(coarse, 2)).reshape(-1, 2)
然后您可以使用 np.isfinite
过滤掉 coarse
为零的行,因为如果 coarse
为零,则输出为 inf
或 nan
.
太棒了!我不知道np.repeat
,非常感谢。
为了以提出的形式回答我最初的问题,我还设法通过 multiprocessing
:
import numpy as np
from multiprocessing import Pool
fine = np.random.uniform(0,100,100000)
fine[fine<20] = 0
coarse = np.sum(fine.reshape(-1,2),axis=1)
def wfunc(zipped):
return zipped[0]/zipped[1]
def wpar(zipped, processes):
p = Pool(processes)
calc = np.asarray(p.map(wfunc, zip(fine,np.repeat(coarse,2))))
p.close()
p.join()
return calc[np.isfinite(calc)].reshape(-1,2)
不过@behzad.nouri的建议显然更好:
def w_opt(fine, coarse):
w = (fine / np.repeat(coarse, 2))
return w[np.isfinite(w)].reshape(-1,2)
#using some iPython magic
%timeit w_opt(fine,coarse)
1000 loops, best of 3: 1.88 ms per loop
%timeit w_xx(fine,coarse)
1 loops, best of 3: 342 ms per loop
%timeit wpar(zip(fine,np.repeat(coarse,2)),6) #I've 6 cores at my disposal
1 loops, best of 3: 1.76 s per loop
再次感谢!
除了@behzad.nouri 提出的 NumPy 表达式之外,您还可以使用 Pythran 编译器获得额外的加速:
$ cat w_xx.py
#pythran export w_xx(float[], float[])
import numpy as np
def w_xx(fine, coarse):
w = (fine / np.repeat(coarse, 2))
return w[np.isfinite(w)].reshape(-1, 2)
$ python -m timeit -s 'import numpy as np; fine = np.random.uniform(0, 100, 100000); fine[fine<20] = 0; coarse = np.sum(fine.reshape(-1, 2), axis=1); from w_xx import w_xx' 'w_xx(fine, coarse)'
1000 loops, best of 3: 1.5 msec per loop
$ pythran w_xx.py -fopenmp -march=native # yes, this generates parallel code
$ python -m timeit -s 'import numpy as np; fine = np.random.uniform(0, 100, 100000); fine[fine<20] = 0; coarse = np.sum(fine.reshape(-1, 2), axis=1); from w_xx import w_xx' 'w_xx(fine, coarse)'
1000 loops, best of 3: 867 usec per loop
免责声明:我是 Pythran 开发人员。