并行化三个嵌套循环

Parallelize three nested loops

上下文:

我有 3 个 3D 阵列(“precursor 阵列”),我正在使用反距离加权方法进行上采样。为此,我计算了一个 3D weights 数组,我在 precursor 数组的每个点上的 for 循环中使用该数组。

我的 weights 数组的每个二维切片用于计算 partial 数组。一旦我生成了所有 28 个,它们就会被加起来得到一个最终的 host 数组。

我想将这个 for 循环并行化以减少我的计算时间。我尝试这样做但我无法正确更新我的 host 数组。

问题:

如何并行化我的主要功能(代码的最后一部分)?

编辑:或者有没有一种方法可以“切片”我的 i for 循环(例如一个核心 运行 在 i = 0 到 5 之间,一个核心 运行在 i = 6 到 9) ?

总结:

这是我的代码(可运行,因为它在 MAIN ALGORITHM PARALLEL 部分之外,请参阅末尾的 MAIN ALGORITHM NOT PARALLEL 部分以了解我的代码的非并行版本):

import numpy as np
import multiprocessing as mp
import time



#%% ------ Create data ------ ###
temperatures = np.random.rand(10,4,7)*100
precipitation = np.random.rand(10,4,7)
snow = np.random.rand(10,4,7)

# Array of altitudes to "adjust" the temperatures
alt = np.random.rand(4,7)*1000



#%% ------ Functions to run in parallel ------ ###

# This function upsamples the precursor arrays and creates the partial arrays 
def interpolator(i, k, mx, my):
    
    T = ((temperatures[i,mx,my]-272.15) + (-alt[mx, my] * -6/1000)) * w[k,:,:]
    P = (precipitation[i,mx,my])*w[k,:,:]
    S = (snow[i,mx,my])*w[k,:,:]
    
    return(T, P, S)

# We add each partial array to each other to create the host array
def get_results(results):
    global temp, prec, Eprec
    temp += results[0]
    prec += results[1]
    Eprec += results[2]



#%% ------ IDW Interpolation ------ ###

# We create a weight matrix that we use to upsample our temperatures, precipitations and snow matrices
# This part is not that important, it works well as it is

MX,MY = np.shape(temperatures[0])
N = 300

T = np.zeros([N*MX+1, N*MY+1])

# create NxM inverse distance weight matrices based on Gaussian interpolation

x = np.arange(0,N*MX+1)
y = np.arange(0,N*MY+1)
X,Y = np.meshgrid(x,y)

k = 0
w =  np.zeros([MX*MY,N*MX+1,N*MY+1])
for mx in range(MX):
    for my in range(MY):
        
        # Gaussian
        add_point = np.exp(-((mx*N-X.T)**2+(my*N-Y.T)**2)/N**2)
        w[k,:,:] += add_point
        k += 1
        
sum_weights = np.sum(w, axis=0)
for k in range(MX*MY):
    w[k,:,:] /= sum_weights
    
    

#%% ------ MAIN ALGORITHM PARALLEL ------ ###

if __name__ == '__main__':
    
    # Create an empty array to use as a template
    dummy = np.zeros((w.shape[1], w.shape[2]))
    
    # Start  a timer
    ts = time.time()
    
    # Iterate over the time dimension
    for i in range(temperatures.shape[0]):
        
        # Initialize the host arrays
        temp = dummy.copy()
        prec = dummy.copy()
        Eprec = dummy.copy()
    
        # Create the pool based on my amount of cores
        pool = mp.Pool(mp.cpu_count())
        
        # Loop through every weight slice, for every cell of the temperatures, precipitations and snow arrays
        for k in range(0,w.shape[0]):
            for mx in range(MX):
                for my in range(MY):
                    
                    # Upsample the temperatures, precipitations and snow arrays by adding the contribution of each weight slice
                    pool.apply_async(interpolator, args = (i, k, mx, my), callback = get_results)
        
        pool.close()
        pool.join()
    
    # Print the time spent on the loop
    print("Time spent: ", time.time()-ts)
    
    

#%% ------ MAIN ALGORITHM NOT PARALLEL ------ ###

if __name__ == '__main__':
    
    # Create an empty array to use as a template
    dummy = np.zeros((w.shape[1], w.shape[2]))
    ts = time.time()
    
    for i in range(temperatures.shape[0]):
        
        # Create empty host arrays
        temp = dummy.copy()
        prec = dummy.copy()
        Eprec = dummy.copy()
        
        k = 0
        for mx in range(MX):
            for my in range(MY):
                
                get_results(interpolator(i, k, mx, my))
                k += 1
                
    print("Time spent:", time.time()-ts)

多处理的问题是它创建了许多新进程 在 main 之前(即 if __name__ == '__main__' 之前)执行代码。这导致初始化非常缓慢(因为所有进程都这样做)并且大量 RAM 被无用。您当然应该将所有内容移到 main 中,或者如果可能的话移到函数中(这通常会导致执行速度更快,并且无论如何都是一种很好的软件工程实践,尤其是对于并行代码)。即使这样,多处理还有另一个大问题:inter-process 通信速度慢。一种解决方案是使用 multi-threaded 方法,通过使用 Numba 或 Cython 成为可能(您可以使用它们禁用 GIL,而不是基本的 CPython 线程)。事实上,它们通常比多处理更易于使用。但是,您应该更加小心,因为并行访问不受保护并且 data-races 可能出现在伪造的并行代码中。

在您的情况下,计算主要是 memory-bound。这意味着多处理非常无用。事实上,并行性在这里几乎没有用处,除非你是 运行 具有 high-throughput 的计算服务器上的此代码。事实上,内存是一种共享资源,使用更多的计算核心并没有多大帮助,因为 1 个核心几乎可以使普通 PC 上的内存带宽饱和(而计算服务器上需要的核心很少)。

加速memory-bound代码的关键是避免创建临时数组并使用cache-friendly算法.在你的例子中,TPS 被填充只是为了稍后阅读以便更新 tempprecEprec 数组.这个临时步骤在这里非常昂贵且必要(尤其是填充数组)。删除它会增加 算术强度 ,从而导致代码在顺序上肯定会更快,并且在多核上可以更好地 scale。我的机器就是这样。

下面是使用 Numba 并行化代码的代码示例:

import numba as nb

# get_results + interpolator
@nb.njit('void(float64[:,::1], float64[:,::1], float64[:,::1], float64[:,:,::1], int_, int_, int_, int_)', parallel=True)
def interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my):
    factor1 = ((temperatures[i,mx,my]-272.15) + (-alt[mx, my] * -6/1000))
    factor2 = precipitation[i,mx,my]
    factor3 = snow[i,mx,my]
    for i in nb.prange(w.shape[1]):
        for j in range(w.shape[2]):
            val = w[k, i, j]
            temp[i, j] += factor1 * val
            prec[i, j] += factor2 * val
            Eprec[i, j] += factor3 * val

# Example of usage:
interpolate_and_get_results(temp, prec, Eprec, w, i, k, mx, my)

注意 nb.njit 中的字符串称为 signature 并为 JIT 指定类型,以便它可以立即编译它。

此代码在我的 6 核机器上快 4.6 倍(如果没有 get_resultsinterpolator 的合并,它的速度几乎没有提高)。事实上,它在顺序中快了 3.8 倍,因此线程并没有多大帮助,因为计算仍然是 memory-bound。事实上,multiply-add 的成本与内存 reads/writes.

相比可以忽略不计