并行化三个嵌套循环
Parallelize three nested loops
上下文:
我有 3 个 3D 阵列(“precursor
阵列”),我正在使用反距离加权方法进行上采样。为此,我计算了一个 3D weights
数组,我在 precursor
数组的每个点上的 for 循环中使用该数组。
我的 weights
数组的每个二维切片用于计算 partial
数组。一旦我生成了所有 28 个,它们就会被加起来得到一个最终的 host
数组。
我想将这个 for 循环并行化以减少我的计算时间。我尝试这样做但我无法正确更新我的 host
数组。
问题:
如何并行化我的主要功能(代码的最后一部分)?
编辑:或者有没有一种方法可以“切片”我的 i
for 循环(例如一个核心 运行 在 i = 0 到 5 之间,一个核心 运行在 i = 6 到 9) ?
总结:
- 3
precursor
个数组(温度、降水、雪):10x4x7(10
是时间维度)
- 1
weight
数组(w):28x1101x2101
- 28x3
partial
阵列:1101x2101
- 3
host
个数组(temp、prec、Eprec):1101x2101
这是我的代码(可运行,因为它在 MAIN ALGORITHM PARALLEL
部分之外,请参阅末尾的 MAIN ALGORITHM NOT PARALLEL
部分以了解我的代码的非并行版本):
import numpy as np
import multiprocessing as mp
import time
#%% ------ Create data ------ ###
temperatures = np.random.rand(10,4,7)*100
precipitation = np.random.rand(10,4,7)
snow = np.random.rand(10,4,7)
# Array of altitudes to "adjust" the temperatures
alt = np.random.rand(4,7)*1000
#%% ------ Functions to run in parallel ------ ###
# This function upsamples the precursor arrays and creates the partial arrays
def interpolator(i, k, mx, my):
T = ((temperatures[i,mx,my]-272.15) + (-alt[mx, my] * -6/1000)) * w[k,:,:]
P = (precipitation[i,mx,my])*w[k,:,:]
S = (snow[i,mx,my])*w[k,:,:]
return(T, P, S)
# We add each partial array to each other to create the host array
def get_results(results):
global temp, prec, Eprec
temp += results[0]
prec += results[1]
Eprec += results[2]
#%% ------ IDW Interpolation ------ ###
# We create a weight matrix that we use to upsample our temperatures, precipitations and snow matrices
# This part is not that important, it works well as it is
MX,MY = np.shape(temperatures[0])
N = 300
T = np.zeros([N*MX+1, N*MY+1])
# create NxM inverse distance weight matrices based on Gaussian interpolation
x = np.arange(0,N*MX+1)
y = np.arange(0,N*MY+1)
X,Y = np.meshgrid(x,y)
k = 0
w = np.zeros([MX*MY,N*MX+1,N*MY+1])
for mx in range(MX):
for my in range(MY):
# Gaussian
add_point = np.exp(-((mx*N-X.T)**2+(my*N-Y.T)**2)/N**2)
w[k,:,:] += add_point
k += 1
sum_weights = np.sum(w, axis=0)
for k in range(MX*MY):
w[k,:,:] /= sum_weights
#%% ------ MAIN ALGORITHM PARALLEL ------ ###
if __name__ == '__main__':
# Create an empty array to use as a template
dummy = np.zeros((w.shape[1], w.shape[2]))
# Start a timer
ts = time.time()
# Iterate over the time dimension
for i in range(temperatures.shape[0]):
# Initialize the host arrays
temp = dummy.copy()
prec = dummy.copy()
Eprec = dummy.copy()
# Create the pool based on my amount of cores
pool = mp.Pool(mp.cpu_count())
# Loop through every weight slice, for every cell of the temperatures, precipitations and snow arrays
for k in range(0,w.shape[0]):
for mx in range(MX):
for my in range(MY):
# Upsample the temperatures, precipitations and snow arrays by adding the contribution of each weight slice
pool.apply_async(interpolator, args = (i, k, mx, my), callback = get_results)
pool.close()
pool.join()
# Print the time spent on the loop
print("Time spent: ", time.time()-ts)
#%% ------ MAIN ALGORITHM NOT PARALLEL ------ ###
if __name__ == '__main__':
# Create an empty array to use as a template
dummy = np.zeros((w.shape[1], w.shape[2]))
ts = time.time()
for i in range(temperatures.shape[0]):
# Create empty host arrays
temp = dummy.copy()
prec = dummy.copy()
Eprec = dummy.copy()
k = 0
for mx in range(MX):
for my in range(MY):
get_results(interpolator(i, k, mx, my))
k += 1
print("Time spent:", time.time()-ts)
多处理的问题是它创建了许多新进程 在 main 之前(即 if __name__ == '__main__'
之前)执行代码。这导致初始化非常缓慢(因为所有进程都这样做)并且大量 RAM 被无用。您当然应该将所有内容移到 main 中,或者如果可能的话移到函数中(这通常会导致执行速度更快,并且无论如何都是一种很好的软件工程实践,尤其是对于并行代码)。即使这样,多处理还有另一个大问题:inter-process 通信速度慢。一种解决方案是使用 multi-threaded 方法,通过使用 Numba 或 Cython 成为可能(您可以使用它们禁用 GIL,而不是基本的 CPython 线程)。事实上,它们通常比多处理更易于使用。但是,您应该更加小心,因为并行访问不受保护并且 data-races 可能出现在伪造的并行代码中。
在您的情况下,计算主要是 memory-bound。这意味着多处理非常无用。事实上,并行性在这里几乎没有用处,除非你是 运行 具有 high-throughput 的计算服务器上的此代码。事实上,内存是一种共享资源,使用更多的计算核心并没有多大帮助,因为 1 个核心几乎可以使普通 PC 上的内存带宽饱和(而计算服务器上需要的核心很少)。
加速memory-bound代码的关键是避免创建临时数组并使用cache-friendly算法.在你的例子中,T
、P
和 S
被填充只是为了稍后阅读以便更新 temp
、prec
和 Eprec
数组.这个临时步骤在这里非常昂贵且必要(尤其是填充数组)。删除它会增加 算术强度 ,从而导致代码在顺序上肯定会更快,并且在多核上可以更好地 scale。我的机器就是这样。
下面是使用 Numba 并行化代码的代码示例:
import numba as nb
# get_results + interpolator
@nb.njit('void(float64[:,::1], float64[:,::1], float64[:,::1], float64[:,:,::1], int_, int_, int_, int_)', parallel=True)
def interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my):
factor1 = ((temperatures[i,mx,my]-272.15) + (-alt[mx, my] * -6/1000))
factor2 = precipitation[i,mx,my]
factor3 = snow[i,mx,my]
for i in nb.prange(w.shape[1]):
for j in range(w.shape[2]):
val = w[k, i, j]
temp[i, j] += factor1 * val
prec[i, j] += factor2 * val
Eprec[i, j] += factor3 * val
# Example of usage:
interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my)
注意 nb.njit
中的字符串称为 signature 并为 JIT 指定类型,以便它可以立即编译它。
此代码在我的 6 核机器上快 4.6 倍(如果没有 get_results
和 interpolator
的合并,它的速度几乎没有提高)。事实上,它在顺序中快了 3.8 倍,因此线程并没有多大帮助,因为计算仍然是 memory-bound。事实上,multiply-add 的成本与内存 reads/writes.
相比可以忽略不计
上下文:
我有 3 个 3D 阵列(“precursor
阵列”),我正在使用反距离加权方法进行上采样。为此,我计算了一个 3D weights
数组,我在 precursor
数组的每个点上的 for 循环中使用该数组。
我的 weights
数组的每个二维切片用于计算 partial
数组。一旦我生成了所有 28 个,它们就会被加起来得到一个最终的 host
数组。
我想将这个 for 循环并行化以减少我的计算时间。我尝试这样做但我无法正确更新我的 host
数组。
问题:
如何并行化我的主要功能(代码的最后一部分)?
编辑:或者有没有一种方法可以“切片”我的 i
for 循环(例如一个核心 运行 在 i = 0 到 5 之间,一个核心 运行在 i = 6 到 9) ?
总结:
- 3
precursor
个数组(温度、降水、雪):10x4x7(10
是时间维度) - 1
weight
数组(w):28x1101x2101 - 28x3
partial
阵列:1101x2101 - 3
host
个数组(temp、prec、Eprec):1101x2101
这是我的代码(可运行,因为它在 MAIN ALGORITHM PARALLEL
部分之外,请参阅末尾的 MAIN ALGORITHM NOT PARALLEL
部分以了解我的代码的非并行版本):
import numpy as np
import multiprocessing as mp
import time
#%% ------ Create data ------ ###
temperatures = np.random.rand(10,4,7)*100
precipitation = np.random.rand(10,4,7)
snow = np.random.rand(10,4,7)
# Array of altitudes to "adjust" the temperatures
alt = np.random.rand(4,7)*1000
#%% ------ Functions to run in parallel ------ ###
# This function upsamples the precursor arrays and creates the partial arrays
def interpolator(i, k, mx, my):
T = ((temperatures[i,mx,my]-272.15) + (-alt[mx, my] * -6/1000)) * w[k,:,:]
P = (precipitation[i,mx,my])*w[k,:,:]
S = (snow[i,mx,my])*w[k,:,:]
return(T, P, S)
# We add each partial array to each other to create the host array
def get_results(results):
global temp, prec, Eprec
temp += results[0]
prec += results[1]
Eprec += results[2]
#%% ------ IDW Interpolation ------ ###
# We create a weight matrix that we use to upsample our temperatures, precipitations and snow matrices
# This part is not that important, it works well as it is
MX,MY = np.shape(temperatures[0])
N = 300
T = np.zeros([N*MX+1, N*MY+1])
# create NxM inverse distance weight matrices based on Gaussian interpolation
x = np.arange(0,N*MX+1)
y = np.arange(0,N*MY+1)
X,Y = np.meshgrid(x,y)
k = 0
w = np.zeros([MX*MY,N*MX+1,N*MY+1])
for mx in range(MX):
for my in range(MY):
# Gaussian
add_point = np.exp(-((mx*N-X.T)**2+(my*N-Y.T)**2)/N**2)
w[k,:,:] += add_point
k += 1
sum_weights = np.sum(w, axis=0)
for k in range(MX*MY):
w[k,:,:] /= sum_weights
#%% ------ MAIN ALGORITHM PARALLEL ------ ###
if __name__ == '__main__':
# Create an empty array to use as a template
dummy = np.zeros((w.shape[1], w.shape[2]))
# Start a timer
ts = time.time()
# Iterate over the time dimension
for i in range(temperatures.shape[0]):
# Initialize the host arrays
temp = dummy.copy()
prec = dummy.copy()
Eprec = dummy.copy()
# Create the pool based on my amount of cores
pool = mp.Pool(mp.cpu_count())
# Loop through every weight slice, for every cell of the temperatures, precipitations and snow arrays
for k in range(0,w.shape[0]):
for mx in range(MX):
for my in range(MY):
# Upsample the temperatures, precipitations and snow arrays by adding the contribution of each weight slice
pool.apply_async(interpolator, args = (i, k, mx, my), callback = get_results)
pool.close()
pool.join()
# Print the time spent on the loop
print("Time spent: ", time.time()-ts)
#%% ------ MAIN ALGORITHM NOT PARALLEL ------ ###
if __name__ == '__main__':
# Create an empty array to use as a template
dummy = np.zeros((w.shape[1], w.shape[2]))
ts = time.time()
for i in range(temperatures.shape[0]):
# Create empty host arrays
temp = dummy.copy()
prec = dummy.copy()
Eprec = dummy.copy()
k = 0
for mx in range(MX):
for my in range(MY):
get_results(interpolator(i, k, mx, my))
k += 1
print("Time spent:", time.time()-ts)
多处理的问题是它创建了许多新进程 在 main 之前(即 if __name__ == '__main__'
之前)执行代码。这导致初始化非常缓慢(因为所有进程都这样做)并且大量 RAM 被无用。您当然应该将所有内容移到 main 中,或者如果可能的话移到函数中(这通常会导致执行速度更快,并且无论如何都是一种很好的软件工程实践,尤其是对于并行代码)。即使这样,多处理还有另一个大问题:inter-process 通信速度慢。一种解决方案是使用 multi-threaded 方法,通过使用 Numba 或 Cython 成为可能(您可以使用它们禁用 GIL,而不是基本的 CPython 线程)。事实上,它们通常比多处理更易于使用。但是,您应该更加小心,因为并行访问不受保护并且 data-races 可能出现在伪造的并行代码中。
在您的情况下,计算主要是 memory-bound。这意味着多处理非常无用。事实上,并行性在这里几乎没有用处,除非你是 运行 具有 high-throughput 的计算服务器上的此代码。事实上,内存是一种共享资源,使用更多的计算核心并没有多大帮助,因为 1 个核心几乎可以使普通 PC 上的内存带宽饱和(而计算服务器上需要的核心很少)。
加速memory-bound代码的关键是避免创建临时数组并使用cache-friendly算法.在你的例子中,T
、P
和 S
被填充只是为了稍后阅读以便更新 temp
、prec
和 Eprec
数组.这个临时步骤在这里非常昂贵且必要(尤其是填充数组)。删除它会增加 算术强度 ,从而导致代码在顺序上肯定会更快,并且在多核上可以更好地 scale。我的机器就是这样。
下面是使用 Numba 并行化代码的代码示例:
import numba as nb
# get_results + interpolator
@nb.njit('void(float64[:,::1], float64[:,::1], float64[:,::1], float64[:,:,::1], int_, int_, int_, int_)', parallel=True)
def interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my):
factor1 = ((temperatures[i,mx,my]-272.15) + (-alt[mx, my] * -6/1000))
factor2 = precipitation[i,mx,my]
factor3 = snow[i,mx,my]
for i in nb.prange(w.shape[1]):
for j in range(w.shape[2]):
val = w[k, i, j]
temp[i, j] += factor1 * val
prec[i, j] += factor2 * val
Eprec[i, j] += factor3 * val
# Example of usage:
interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my)
注意 nb.njit
中的字符串称为 signature 并为 JIT 指定类型,以便它可以立即编译它。
此代码在我的 6 核机器上快 4.6 倍(如果没有 get_results
和 interpolator
的合并,它的速度几乎没有提高)。事实上,它在顺序中快了 3.8 倍,因此线程并没有多大帮助,因为计算仍然是 memory-bound。事实上,multiply-add 的成本与内存 reads/writes.