改进 Numba 中的并行化
Improving parallelization in Numba
我在 Numba 中有一个函数,它有几个可以并行化的循环。循环写入一个公共数组 K,因此我知道编译器可能没有尽可能多地进行优化。
但是,我不知道什么会使 numba 的 jit 编译器创建最高效的代码。文档中的示例过于简单,无法提供帮助。
我已经尝试将每个 range
更改为 prange
。在 k_x
上并行化循环时获得了最好的结果,但我在 4 核机器上只得到了 2.8 倍的改进。我知道我不应该期待线性性能提升,但我觉得在这种情况下我应该得到更好的结果。例如,我使用 dask 得到了更好的结果,x_cond.map_blocks(cond_expect_kernel, x_tr, *args)
考虑到调度程序开销,这很奇怪。
除了简单地将 range
更改为 prange
之外,还有什么方法可以改进此函数的并行化?
原函数
@jit(float64[:,:](float64[:,:], float64[:,:], int64, int64), nopython=True, nogil=True)
def cond_expect_kernel(x_cond, x_tr, degree, amount_non_cond_vars):
size = x_cond.shape[1]
x_tr_cond = x_tr[:, :size]
samples_x = x_cond.shape[0]
samples_tr = x_tr.shape[0]
K = (1+np.dot(x_cond, x_tr_cond.T))**degree
for j in range(size, size+amount_non_cond_vars):
for k_x in range(samples_x):
for k_x_tr in range(samples_tr):
K[k_x, k_x_tr] += x_tr[k_x_tr, j]**2*3
for j_left in range(size):
K[k_x, k_x_tr] += x_cond[k_x, j_left]*x_tr[k_x_tr, j_left]*x_tr[k_x_tr, j] ** 2 *3
return K
迄今为止最好的并行版本:
@jit(float64[:,:](float64[:,:], float64[:,:], int64, int64), nopython=True, parallel=True)
def cond_expect_kernel_parallel(x_cond, x_tr, degree, amount_non_cond_vars):
size = x_cond.shape[1]
x_tr_cond = x_tr[:, :size]
samples_x = x_cond.shape[0]
samples_tr = x_tr.shape[0]
K = (1+np.dot(x_cond, x_tr_cond.T))**degree
for j in range(size, size+amount_non_cond_vars):
for k_x in prange(samples_x):
for k_x_tr in range(samples_tr):
K[k_x, k_x_tr] += x_tr[k_x_tr, j]**2*3
for j_left in range(size):
K[k_x, k_x_tr] += x_cond[k_x, j_left]*x_tr[k_x_tr, j_left]*x_tr[k_x_tr, j] ** 2 *3
return K
作为参考,我正在使用一台 4 核机器和另一台 16 核机器。 samples_x
约10万,samples_tr
约5万,size
约3,amount_non_cond_vars
约100。
谢谢!
您的代码存在一些性能关键问题。
- 您将输入和输出数组声明为非连续数组,例如。这将是 C 连续数组
nb.float64[:,::1]
的声明。这通常会阻止 SIMD 向量化,并且在许多情况下会导致性能下降。如果您不确定数组是否是 C 连续的,请不要声明它,Numba 可以自己做。
- 如果您的代码中有一些缩减,请使用标量求和变量并将末尾的结果复制到数组中。必须避免不必要的 reading/writing 共享数组。
- 想想你的内存访问模式/循环顺序。如果你在这里做错了什么,你很早就会陷入内存瓶颈。
- 如果像
size
这样的东西经常是3,你可以为这个问题写一个专门的版本。 (在这种情况下手动循环展开)。如果出现特殊情况,您可以使用小型包装函数进行检查。
例子
import numpy as np
import time
import llvmlite.binding as llvm
llvm.set_option('', '--debug-only=loop-vectorize')
@nb.njit(nb.float64[:,:](nb.float64[:,:], nb.float64[:,:], nb.int64, nb.int64),fastmath=True,parallel=True)
def cond_expect_kernel_gen(x_cond, x_tr, degree, amount_non_cond_vars):
x_tr_cond = x_tr[:,:x_cond.shape[1]]
K = np.dot(x_cond, x_tr_cond.T)
for k_x in nb.prange(x_cond.shape[0]):
for k_x_tr in range(x_tr.shape[0]):
sum=(K[k_x, k_x_tr]+1)**degree
for j in range(x_cond.shape[1], x_cond.shape[1]+amount_non_cond_vars):
sum += x_tr[k_x_tr, j]**2*3
for j_left in range(x_cond.shape[1]):
sum += x_cond[k_x, j_left]*x_tr[k_x_tr, j_left]*x_tr[k_x_tr, j] ** 2 *3
K[k_x, k_x_tr]=sum
return K
@nb.njit(nb.float64[:,::1](nb.float64[:,::1], nb.float64[:,::1], nb.int64, nb.int64),fastmath=True,parallel=True)
def cond_expect_kernel_3(x_cond, x_tr, degree, amount_non_cond_vars):
assert x_cond.shape[1]==3
x_tr_cond = x_tr[:,:x_cond.shape[1]]
K = np.dot(x_cond, x_tr_cond.T)
for k_x in nb.prange(x_cond.shape[0]):
for k_x_tr in range(x_tr.shape[0]):
sum=(K[k_x, k_x_tr]+1)**degree
for j in range(x_cond.shape[1], x_cond.shape[1]+amount_non_cond_vars):
sum += x_tr[k_x_tr, j]**2*3
sum_2=0.
sum_2 += x_cond[k_x, 0]*x_tr[k_x_tr, 0]
sum_2 += x_cond[k_x, 1]*x_tr[k_x_tr, 1]
sum_2 += x_cond[k_x, 2]*x_tr[k_x_tr, 2]
sum+=sum_2*x_tr[k_x_tr, j] ** 2 *3
K[k_x, k_x_tr]=sum
return K
性能
x_cond=np.random.rand(10_000,3)
x_tr=np.random.rand(5_000,103)
amount_non_cond_vars=100
degree=3
t1=time.time()
res_1=cond_expect_kernel_gen(x_cond, x_tr, degree, amount_non_cond_vars)
print(time.time()-t1)
t1=time.time()
res_2=cond_expect_kernel_3(x_cond, x_tr, degree, amount_non_cond_vars)
print(time.time()-t1)
(Quadcore i7, Numba 0.40dev)
your version, single threaded: 40s
your version, parallel: 8.61s
mod_general:3.8s
mod_3: 1.35s
我在 Numba 中有一个函数,它有几个可以并行化的循环。循环写入一个公共数组 K,因此我知道编译器可能没有尽可能多地进行优化。 但是,我不知道什么会使 numba 的 jit 编译器创建最高效的代码。文档中的示例过于简单,无法提供帮助。
我已经尝试将每个 range
更改为 prange
。在 k_x
上并行化循环时获得了最好的结果,但我在 4 核机器上只得到了 2.8 倍的改进。我知道我不应该期待线性性能提升,但我觉得在这种情况下我应该得到更好的结果。例如,我使用 dask 得到了更好的结果,x_cond.map_blocks(cond_expect_kernel, x_tr, *args)
考虑到调度程序开销,这很奇怪。
除了简单地将 range
更改为 prange
之外,还有什么方法可以改进此函数的并行化?
原函数
@jit(float64[:,:](float64[:,:], float64[:,:], int64, int64), nopython=True, nogil=True)
def cond_expect_kernel(x_cond, x_tr, degree, amount_non_cond_vars):
size = x_cond.shape[1]
x_tr_cond = x_tr[:, :size]
samples_x = x_cond.shape[0]
samples_tr = x_tr.shape[0]
K = (1+np.dot(x_cond, x_tr_cond.T))**degree
for j in range(size, size+amount_non_cond_vars):
for k_x in range(samples_x):
for k_x_tr in range(samples_tr):
K[k_x, k_x_tr] += x_tr[k_x_tr, j]**2*3
for j_left in range(size):
K[k_x, k_x_tr] += x_cond[k_x, j_left]*x_tr[k_x_tr, j_left]*x_tr[k_x_tr, j] ** 2 *3
return K
迄今为止最好的并行版本:
@jit(float64[:,:](float64[:,:], float64[:,:], int64, int64), nopython=True, parallel=True)
def cond_expect_kernel_parallel(x_cond, x_tr, degree, amount_non_cond_vars):
size = x_cond.shape[1]
x_tr_cond = x_tr[:, :size]
samples_x = x_cond.shape[0]
samples_tr = x_tr.shape[0]
K = (1+np.dot(x_cond, x_tr_cond.T))**degree
for j in range(size, size+amount_non_cond_vars):
for k_x in prange(samples_x):
for k_x_tr in range(samples_tr):
K[k_x, k_x_tr] += x_tr[k_x_tr, j]**2*3
for j_left in range(size):
K[k_x, k_x_tr] += x_cond[k_x, j_left]*x_tr[k_x_tr, j_left]*x_tr[k_x_tr, j] ** 2 *3
return K
作为参考,我正在使用一台 4 核机器和另一台 16 核机器。 samples_x
约10万,samples_tr
约5万,size
约3,amount_non_cond_vars
约100。
谢谢!
您的代码存在一些性能关键问题。
- 您将输入和输出数组声明为非连续数组,例如。这将是 C 连续数组
nb.float64[:,::1]
的声明。这通常会阻止 SIMD 向量化,并且在许多情况下会导致性能下降。如果您不确定数组是否是 C 连续的,请不要声明它,Numba 可以自己做。 - 如果您的代码中有一些缩减,请使用标量求和变量并将末尾的结果复制到数组中。必须避免不必要的 reading/writing 共享数组。
- 想想你的内存访问模式/循环顺序。如果你在这里做错了什么,你很早就会陷入内存瓶颈。
- 如果像
size
这样的东西经常是3,你可以为这个问题写一个专门的版本。 (在这种情况下手动循环展开)。如果出现特殊情况,您可以使用小型包装函数进行检查。
例子
import numpy as np
import time
import llvmlite.binding as llvm
llvm.set_option('', '--debug-only=loop-vectorize')
@nb.njit(nb.float64[:,:](nb.float64[:,:], nb.float64[:,:], nb.int64, nb.int64),fastmath=True,parallel=True)
def cond_expect_kernel_gen(x_cond, x_tr, degree, amount_non_cond_vars):
x_tr_cond = x_tr[:,:x_cond.shape[1]]
K = np.dot(x_cond, x_tr_cond.T)
for k_x in nb.prange(x_cond.shape[0]):
for k_x_tr in range(x_tr.shape[0]):
sum=(K[k_x, k_x_tr]+1)**degree
for j in range(x_cond.shape[1], x_cond.shape[1]+amount_non_cond_vars):
sum += x_tr[k_x_tr, j]**2*3
for j_left in range(x_cond.shape[1]):
sum += x_cond[k_x, j_left]*x_tr[k_x_tr, j_left]*x_tr[k_x_tr, j] ** 2 *3
K[k_x, k_x_tr]=sum
return K
@nb.njit(nb.float64[:,::1](nb.float64[:,::1], nb.float64[:,::1], nb.int64, nb.int64),fastmath=True,parallel=True)
def cond_expect_kernel_3(x_cond, x_tr, degree, amount_non_cond_vars):
assert x_cond.shape[1]==3
x_tr_cond = x_tr[:,:x_cond.shape[1]]
K = np.dot(x_cond, x_tr_cond.T)
for k_x in nb.prange(x_cond.shape[0]):
for k_x_tr in range(x_tr.shape[0]):
sum=(K[k_x, k_x_tr]+1)**degree
for j in range(x_cond.shape[1], x_cond.shape[1]+amount_non_cond_vars):
sum += x_tr[k_x_tr, j]**2*3
sum_2=0.
sum_2 += x_cond[k_x, 0]*x_tr[k_x_tr, 0]
sum_2 += x_cond[k_x, 1]*x_tr[k_x_tr, 1]
sum_2 += x_cond[k_x, 2]*x_tr[k_x_tr, 2]
sum+=sum_2*x_tr[k_x_tr, j] ** 2 *3
K[k_x, k_x_tr]=sum
return K
性能
x_cond=np.random.rand(10_000,3)
x_tr=np.random.rand(5_000,103)
amount_non_cond_vars=100
degree=3
t1=time.time()
res_1=cond_expect_kernel_gen(x_cond, x_tr, degree, amount_non_cond_vars)
print(time.time()-t1)
t1=time.time()
res_2=cond_expect_kernel_3(x_cond, x_tr, degree, amount_non_cond_vars)
print(time.time()-t1)
(Quadcore i7, Numba 0.40dev)
your version, single threaded: 40s
your version, parallel: 8.61s
mod_general:3.8s
mod_3: 1.35s