并行化数百万次 Numpy 函数迭代

Parallelizing Millions of Iterations of Numpy Functions

我运行在参数 sample 的 2000 万种不同组合上使用下面的函数 compare,其中 sample 是由 100 个 1 和0秒。

compare 将几个其他数组与 sample 一起使用,并使用它们执行一些点积,对这些点积求幂,然后将它们相互比较。这些其他阵列保持不变。

在我的笔记本电脑上,运行 完成所有 2000 万个组合需要大约一个小时。

我正在寻找让它运行得更快的方法。我愿意改进以下代码并使用 dask 等利用并行处理的库。

备注:

def compare(sample, competition_exp_dot, preferences): # 140 µs
    sample_exp_dot = np.exp(preferences @ sample) #30.3 µs
    all_competitors = np.append(sample_exp_dot.reshape(-1, 1), competition_exp_dot, 1) # 5 µs
    all_results = all_products/all_competitors.sum(axis=1)[:,None] #27.4 µs

    return all_results.mean(axis=0) #20.6 µs
#these inputs to the above function stay the same
preferences = np.random.random((1000,100))
competition = np.array([np.random.randint(0,2,100), np.random.randint(0,2,100)])
competition_exp_dot = np.exp(preferences @ competition.T)

# the function is run with 20,000,000 variations of sample
population = np.random.randint(0,2,(20000000,100))
result = [share_calc(sample, competition_exp_dot, preferences) for sample in population]

您可以考虑以下几点:

import torch
import numpy as np
x = np.array([[1,2,3],[4,5,6]])
b = torch.from_numpy(x)
if torch.cuda.is_available():
    device = torch.device("cuda")
b = b.to(device)

有很多方法可以加速像这样的简单数组编程代码:

  1. 你可以使用像Numba这样的工具,它会融合一些工作,同时也提供一些单节点多核并行的选项
  2. 您可以使用像 Dask 这样的工具将其扩展到单台机器的多个内核上(也可以使用 Numba)或跨集群
  3. 您可以使用 GPU 阵列库之一,例如 Torch、TensorFlow、CuPy 或 Jax 运行 在 GPU 上实现此功能

您也可以混合执行上述操作。

我按照 MRocklin 的建议实施了 numba。结果在我的机器上快了大约 4 倍。

修改后的 Numba 版本

import numba as nb

@nb.jit
def nb_compare(sample, competition_exp_dot, preferences):
    sample_exp_dot = np.exp(preferences @ sample)
    all_competitors = np.append(sample_exp_dot.reshape(-1, 1), competition_exp_dot, 1)
    all_results = (all_competitors.T/all_competitors.sum(axis=1)).T

    return np_mean(all_results, 0) # see source for np_mean in notes below

可比较的 Numpy 版本

import numpy as np
def np_compare(sample, competition_exp_dot, preferences):
    sample_exp_dot = np.exp(preferences @ sample)
    all_competitors = np.append(sample_exp_dot.reshape(-1, 1), competition_exp_dot, 1)
    all_results = (all_competitors.T/all_competitors.sum(axis=1)).T

    return all_results.mean(axis=0)

时序比较

设置:

preferences = np.random.random((1000,100)).astype(np.float32)
competition = np.array([np.random.randint(0,2,100), np.random.randint(0,2,100)]).astype(np.float32)
competition_exp_dot = np.exp(preferences @ competition.T)
sample = np.random.randint(0,2,100)
%timeit np_compare(sample, competition_exp_dot, preferences)
"210 µs ± 13.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)"


%timeit -n 10000 nb_compare(population[0], competition_exp_dot, preferences)
"52.4 µs ± 4.48 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)"

备注

Numba 不支持可选参数,例如 np.mean 和 returns 的轴等打字错误。在我的 numba 代码中,我使用 np_mean 以下版本的调用。

归功于 joelrich

import numba as nb, numpy as np

# fix to use np.mean along axis=0 (numba doesn't support optional arguments for np.mean)
# credit to: joelrich https://github.com/numba/numba/issues/1269#issuecomment-472574352
@nb.njit
def np_apply_along_axis(func1d, axis, arr):
  assert arr.ndim == 2
  assert axis in [0, 1]
  if axis == 0:
    result = np.empty(arr.shape[1])
    for i in range(len(result)):
      result[i] = func1d(arr[:, i])
  else:
    result = np.empty(arr.shape[0])
    for i in range(len(result)):
      result[i] = func1d(arr[i, :])
  return result

@nb.njit
def np_mean(array, axis):
  return np_apply_along_axis(np.mean, axis, array)