在 numpy 行上并行化循环
Parallelize loop over numpy rows
我需要对 numpy 数组中的每一行应用相同的函数,并将结果再次存储在 numpy 数组中。
# states will contain results of function applied to a row in array
states = np.empty_like(array)
for i, ar in enumerate(array):
states[i] = function(ar, *args)
# do some other stuff on states
function
对我的数据进行一些 非平凡的 过滤,并在条件为 True 和 False 时对数组进行 returns 过滤。 function
可以是纯 python 或 cython 编译的。对行的过滤操作很复杂,并且可能取决于行中的先前值,这意味着我无法以逐个元素的方式对整个数组进行操作
有没有办法在 dask 中做这样的事情?
将您的函数变成通用函数:http://docs.scipy.org/doc/numpy/reference/ufuncs.html。
然后:states = function(array, *args)
.
Dask 解决方案
您可以使用 dask.array 通过按行对数组进行分块,调用 map_blocks
,然后计算结果
ar = ...
x = da.from_array(ar, chunks=(1, arr.shape[1]))
x.map_blocks(function, *args)
states = x.compute()
默认使用线程,您可以通过以下方式使用进程
from dask.multiprocessing import get
states = x.compute(get=get)
池解
然而,对于像这样令人尴尬的并行计算,dask 可能有点矫枉过正,您可以使用线程池
from multiprocessing.pool import ThreadPool
pool = ThreadPool()
ar = ...
states = np.empty_like(array)
def f(i):
states[i] = function(ar[i], *args)
pool.map(f, range(len(ar)))
您可以通过以下更改切换到进程
from multiprocessing import Pool
pool = Pool()
我需要对 numpy 数组中的每一行应用相同的函数,并将结果再次存储在 numpy 数组中。
# states will contain results of function applied to a row in array
states = np.empty_like(array)
for i, ar in enumerate(array):
states[i] = function(ar, *args)
# do some other stuff on states
function
对我的数据进行一些 非平凡的 过滤,并在条件为 True 和 False 时对数组进行 returns 过滤。 function
可以是纯 python 或 cython 编译的。对行的过滤操作很复杂,并且可能取决于行中的先前值,这意味着我无法以逐个元素的方式对整个数组进行操作
有没有办法在 dask 中做这样的事情?
将您的函数变成通用函数:http://docs.scipy.org/doc/numpy/reference/ufuncs.html。
然后:states = function(array, *args)
.
Dask 解决方案
您可以使用 dask.array 通过按行对数组进行分块,调用 map_blocks
,然后计算结果
ar = ...
x = da.from_array(ar, chunks=(1, arr.shape[1]))
x.map_blocks(function, *args)
states = x.compute()
默认使用线程,您可以通过以下方式使用进程
from dask.multiprocessing import get
states = x.compute(get=get)
池解
然而,对于像这样令人尴尬的并行计算,dask 可能有点矫枉过正,您可以使用线程池
from multiprocessing.pool import ThreadPool
pool = ThreadPool()
ar = ...
states = np.empty_like(array)
def f(i):
states[i] = function(ar[i], *args)
pool.map(f, range(len(ar)))
您可以通过以下更改切换到进程
from multiprocessing import Pool
pool = Pool()