在 Python 中用嵌套循环重写 MATLAB 代码并快速执行

Rewriting MATLAB code with nested loops in Python with fast execution

这是一个嵌套循环,其中内部索引依赖于外部索引,参数如下:

f = rand(1,70299)
nech=24*30*24
N=length(f);
xh=(1:nech)/24;

在 MATLAB 中:

sf2(1:nech)=0.;
sf2vel(1:nech)=0.;
count(1:nech)=0.;

for i=1:nech
    for j=1:N-i-1
        sf2(i)=sf2(i)+(f(j+i)-f(j))^2;
        count(i)=count(i)+1; 
    end
    sf2(i)=sf2(i)/count(i);
end

在Python中:

def structFunPython(f,N,nech):
    sf2 = np.zeros(N)
    count = np.zeros(N)
    for i in range(nech):
        indN = np.arange(1,N-i-1)
        for j in indN:
            sf2[i] += np.power((f[i+j]-f[j]),2)
            count[i] += 1
        sf2[i] = sf2[i]/count[i]
    return sf2

使用 cython:

import cython
cimport numpy as np
import numpy as np
def structFun(np.ndarray f,N,nech):
    cdef np.ndarray sf2 = np.zeros(N), count = np.zeros(N),
    for i in range(nech):
        indN = np.arange(1,N-i-1)
        for j in indN:
            sf2[i] += np.power((f[i+j]-f[j]),2)
            count[i] += 1
        sf2[i] = sf2[i]/count[i]
    return sf2

执行次数:

Matlab: 7.8377 sec
Python: 3651.35 sec
Cython: 3336.21 sec

我很难相信 Python 和 Cython(尤其是 Cython)对于相同的计算如此缓慢,所以我想我一定是在我的 Python/Cython 循环中犯了一个错误,但我看不到哪里。

我将 MATLAB 代码重写为等效 Python 代码的方式可能是(注意索引在 MATLAB 中从 1 开始,在 Python 中从 0 开始...因为我不知道这是怎么回事应该在没有上下文的情况下进行调整我采用了最简单的方法):

import numpy as np


def func(f_arr, nech, n):
    sf2 = np.zeros(nech)
    count = np.zeros(nech)
    for i in range(nech):
        for j in range(n - i):
            sf2[i] += (f_arr[i + j] - f_arr[i]) ** 2
            count[i] += 1
        sf2[i] /= count[i]
    return sf2

注意count[i] += 1是没有用的,因为count[i]的最终值是预先知道的,实际上整个count是没有用的,例如:

import numpy as np


def func2(f_arr, nech, n):
    sf2 = np.zeros(nech)
    for i in range(nech):
        for j in range(n - i):
            sf2[i] += (f_arr[i + j] - f_arr[i]) ** 2
        sf2[i] /= (n - i)
    return sf2

Speed-up

这是 Numba 加速的手动案例。这就像 adding/using Numba @njit 装饰器一样简单:

import numba as nb


func_nb = nb.njit(func)
func2_nb = nb.njit(func2)

现在,funcfunc_nbfunc2func2_nb 都执行相同的计算:

nech = n = 6
f_arr = np.arange(n)
print(func(f_arr, nech, n))
# [9.16666667 6.         3.5        1.66666667 0.5        0.        ]
print(func_nb(f_arr, nech, n))
# [9.16666667 6.         3.5        1.66666667 0.5        0.        ]
print(func2(f_arr, nech, n))
# [9.16666667 6.         3.5        1.66666667 0.5        0.        ]
print(func2_nb(f_arr, nech, n))
# [9.16666667 6.         3.5        1.66666667 0.5        0.        ]

如果你真的需要坚持使用 Cython,这里有一个基于 func2:

的实现
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True


import numpy as np
import cython as cy


cpdef func2_cy(f_arr, nech, n):
    sf2 = np.zeros(nech)
    _func2_cy(f_arr.astype(np.float_), sf2, nech, n)
    return sf2


cdef _func2_cy(double[:] f_arr, double[:] sf2, cy.int nech, cy.int n):
    for i in range(nech):
        for j in range(1, n - i):
            sf2[i] = sf2[i] + (f_arr[i + j] - f_arr[i]) ** 2
        sf2[i] = sf2[i] / (n - i)

与 Numba 相比,它的编写要复杂得多,但性能相似。 诀窍是拥有一个函数 _func2_cy ,它与 Python 几乎没有交互(阅读:它以 C 速度运行)。

结果与 func2:

相同
nech = n = 6
f_arr = np.arange(n)
print(func2_cy(f_arr, nech, n))
# [9.16666667 6.         3.5        1.66666667 0.5        0.        ]

时间

通过一些小玩具基准测试,我们得到了 speed-ups 的感觉,包括像您一样编写的类似函数,以及 中提出的矢量化解决方案(但修复索引以匹配以上):

def func_OP(f, nech, n):
    sf2 = np.zeros(n)
    count = np.zeros(n)
    for i in range(nech):
        indN = np.arange(n - i)  # <-- indexing fixed
        for j in indN:
            sf2[i] += np.power((f[i+j]-f[i]),2)
            count[i] += 1
        sf2[i] = sf2[i] / count[i]
    return sf2


func_OP_nb = nb.njit(func_OP)
def func_pdisty(f, nech, n):
    res = np.zeros(nech)
    dists = scipy.spatial.distance.pdist(f[:, None], metric='sqeuclidean')
    counts = np.arange(n - 1, n - nech - 1, -1)
    inds = np.repeat(np.arange(res.size), counts)
    np.add.at(res, inds, dists[:inds.size])
    res /= (counts + 1)
    return res
nech = n = 6
f_arr = np.arange(n)
print(func_OP(f_arr, nech, n))
# [9.16666667 6.         3.5        1.66666667 0.5        0.        ]
print(func_pdisty(f_arr, nech, n))
# [9.16666667 6.         3.5        1.66666667 0.5        0.        ]
nech = n = 1000
f_arr = np.arange(n)
%timeit func_OP(f_arr, nech, n)
# 1 loop, best of 5: 1.5 s per loop
%timeit func(f_arr, nech, n)
# 1 loop, best of 5: 567 ms per loop
%timeit func2(f_arr, nech, n)
# 1 loop, best of 5: 352 ms per loop
%timeit func_OP_nb(f_arr, nech, n)
# 1000 loops, best of 5: 1.87 ms per loop
%timeit func_nb(f_arr, nech, n)
# 1000 loops, best of 5: 1.7 ms per loop
%timeit func2_nb(f_arr, nech, n)
# 1000 loops, best of 5: 768 µs per loop
%timeit func_pdisty(f_arr, nech, n)
# 10 loops, best of 5: 44.5 ms per loop
%timeit func2_cy(f_arr, nech, n)
# 1000 loops, best of 5: 1 ms per loop
nech = n = 2000
f_arr = np.arange(n)
%timeit func_OP(f_arr, nech, n)
# 1 loop, best of 5: 6.01 s per loop
%timeit func(f_arr, nech, n)
# 1 loop, best of 5: 2.3 s per loop
%timeit func2(f_arr, nech, n)
# 1 loop, best of 5: 1.42 s per loop
%timeit func_OP_nb(f_arr, nech, n)
# 100 loops, best of 5: 7.31 ms per loop
%timeit func_nb(f_arr, nech, n)
# 100 loops, best of 5: 6.82 ms per loop
%timeit func2_nb(f_arr, nech, n)
# 100 loops, best of 5: 3.05 ms per loop
%timeit func_pdisty(f_arr, nech, n)
# 1 loop, best of 5: 344 ms per loop
%timeit func2_cy(f_arr, nech, n)
# 100 loops, best of 5: 3.95 ms per loop
nech = n = 4000
f_arr = np.arange(n)
%timeit func_OP(f_arr, nech, n)
# 1 loop, best of 5: 24.3 s per loop
%timeit func(f_arr, nech, n)
# 1 loop, best of 5: 9.27 s per loop
%timeit func2(f_arr, nech, n)
# 1 loop, best of 5: 5.71 s per loop
%timeit func_OP_nb(f_arr, nech, n)
# 10 loops, best of 5: 29 ms per loop
%timeit func_nb(f_arr, nech, n)
# 10 loops, best of 5: 27.3 ms per loop
%timeit func2_nb(f_arr, nech, n)
# 100 loops, best of 5: 12.2 ms per loop
%timeit func_pdisty(f_arr, nech, n)
# 1 loop, best of 5: 706 ms per loop
%timeit func2_cy(f_arr, nech, n)
# 100 loops, best of 5: 15.9 ms per loop

...以及您提供的输入尺寸:

nech = 24 * 30 * 24
n = 70299
f_arr = np.random.random(n)
%timeit -n1 -r1 func_OP(f_arr, nech, n)
# 1 loop, best of 1: 1h 4min 50s per loop
%timeit -n1 -r1 func(f_arr, nech, n)
# 1 loop, best of 1: 21min 14s per loop
%timeit -n1 -r1 func2(f_arr, nech, n)  # only one run / loop
# 1 loop, best of 1: 13min 9s per loop
%timeit func_OP_nb(f_arr, nech, n)
# 1 loop, best of 5: 4.74 s per loop
%timeit func_nb(f_arr, nech, n)
# 1 loop, best of 5: 4 s per loop
%timeit func2_nb(f_arr, nech, n)
# 1 loop, best of 5: 1.62 s per loop
# %timeit func_pdisty(f_arr, nech, n)
# -- MEMORY ERROR --
%timeit func2_cy(f_arr, nech, n)
# 1 loop, best of 5: 2.2 s per loop

免责声明:正如@norok2 在评论中指出的那样,由于使用 pdist,我的方法至少分配了 N*(N-1)/2 个双倍。对于您的 N = 70299 这意味着数组中大约有 18.5 GB 的双精度数。其他索引数组将具有相似的大小。因此,除非您的某些用例较小 N,否则此答案中的矢量化方法仅在您有大量内存时才可行。


正如其他人所指出的,仅仅将代码从一种语言翻译成另一种语言不会导致两种语言的最佳代码。单独使用 Cython 并不能保证 speed-up,就像单独使用 NumPy 不能保证 speed-up.

很好地向您展示了如何使用 numba 或类似的东西来编译您的数字代码。这可以为您提供与 MATLAB 在性能方面非常相似的东西,因为 MATLAB 有自己的 just-in-time (JIT) 编译器。还有回旋余地来优化编译代码,因为多个实现最终可能会产生截然不同的性能。

无论如何,我想指出的是,您还可以通过使用 NumPy 和 SciPy 中的高级功能来加速您的代码。特别是,您想要计算一组 1d 点之间的成对平方距离。这就是 scipy.spatial.distance.pdist 可以为您做的(使用 'sqeuclidean' 平方欧氏范数)。好处是它只计算每个成对距离一次(这对 CPU 和内存性能很好),但缺点是挑选你想要总结的贡献有点麻烦。

无论如何,这是与您的 Python 实现相对应的代码(修复了内部循环使用 np.arange(1, N-i) 而不是 np.arange(1, N-i-1)):

from scipy.spatial.distance import pdist

def pdisty(f, nech):
    offset = f.size - nech
    res = np.zeros_like(f, shape=nech)
    dists = pdist(f[:, None], metric='sqeuclidean')
    counts = np.arange(offset, f.size)[::-1]
    inds = np.repeat(np.arange(res.size), counts)
    np.add.at(res, inds, dists[:inds.size])
    res /= counts
    return res

这里发生的是

  1. 我们计算每对唯一数组值的成对距离并将其存储到 dists
  2. 我们计算每个点涉及的对数(这是我们最后必须归一化的),将其存储到 counts
  3. 计算出dists中每个值对应的第1d索引(这是难点),存入inds
  4. 使用np.add.at累积对适当输出索引的每个贡献
  5. 用计数标准化。

下面是N = 1000的一些时序,其中func2()对应的函数:

>>> %timeit structFunPython(f, f.size - 1)
... %timeit func2(f, f.size - 1)
... %timeit pdisty(f, f.size - 1)
1.48 s ± 89.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
274 ms ± 2.71 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
36.7 ms ± 1.21 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

上述解决方案要快得多,但当然它仍然比完全编译的解决方案慢。如果您遇到额外依赖项或在系统上安装 llvm 的问题,这可能是一个合理的妥协。最重要的是,代码应该适应您尝试优化它的语言。


为了完整起见,这里是我用于比较的实现(我稍微更改了签名,因为 N 可以从输入数组中计算出来,并且我修复了一些 fencepost 错误):

def structFunPython(f, nech):
    """Very slightly modified from the question"""
    N = f.size
    sf2 = np.zeros(nech)
    count = np.zeros(nech)
    for i in range(nech):
        indN = np.arange(1,N-i)
        for j in indN:
            sf2[i] += np.power((f[i+j]-f[i]),2)
            count[i] += 1
        sf2[i] = sf2[i]/count[i]
    return sf2


def func2(f_arr, nech):
    """Very slightly modified from norok2's answer

    See 

    """
    n = f_arr.size
    sf2 = np.zeros(nech)
    for i in range(nech):
        for j in range(1, n - i):
            sf2[i] += (f_arr[i + j] - f_arr[i]) ** 2
        sf2[i] /= (n - i - 1)
    return sf2

使用这些定义,所有三个函数在机器精度内给出相同的结果:

rng = np.random.default_rng()
N = 1000
nech = N - 2
f = rng.random(1000)
assert np.allclose(structFunPython(f, nech), func2(f, nech))
assert np.allclose(structFunPython(f, nech), pdisty(f, nech))

通过一些基本的 python/numpy 重写,我可以大大加快您的代码速度

def structFunPython4(f,N,nech):
    sf2 = np.zeros(N)
    #count = np.zeros(N)
    for i in range(nech):
        # indN = np.arange(1,N-i-1)
        sf2[i] = np.sum(np.power(f[i+1:N-1]-f[i],2))
        #for j in range(1,N-i-1):
        #    sf2[i] += np.power((f[i+j]-f[i]),2)
        #    #count[i] += 1
        sf2[i] = sf2[i]/(N-i-2)
    return sf2

对于适度的样本量:

In [53]: f = np.arange(100); N=f.shape[0]; nech=98

他们匹配:

In [54]: np.allclose(structFunPython(f,N,nech),structFunPython4(f,N,nech))
Out[54]: True

和时间:

In [55]: timeit structFunPython(f,N,nech)
34.4 ms ± 109 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [56]: timeit structFunPython4(f,N,nech)
2.22 ms ± 35.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

我先把for i in indN:换成了for i in range(1,N-1-i):。对数组的迭代比对列表或 range.

的迭代慢

正如其他人指出的那样,我们不需要迭代 count

但最大的变化是用数组切片替换j迭代,打开整个切片和数组求和。

我还没有仔细研究 i 迭代来消除它。 f[i+1:N-1] 切片长度不同,从 nech 到 0。

MATLAB 进行了大量的 jit 编译,因此您可以通过迭代来解决问题。回到 1990 年代,当我使用 MATLAB 时,它的形式可能很糟糕——那些版本需要 whole-matrix 计算才能获得任何合理的速度。 Python 级迭代很慢(解释),并且在数组上比在列表上慢。尽可能使用 whole-array 方法。或者使用numba编译计算。

====

您已经得到了一些不错的答案,这些答案提供了快速的 numpy 和 numba 实现。我想展示一个不错的 Cython 实现以进行公平比较。

首先,让我们为 norok2 在我的机器上实现最快的 numba 计时:

In [3]: %timeit func2_nb(f_arr, n)
3.4 s ± 129 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
  • 为了在 Cython 中获得快速内存访问,为它提供有关输入数组的所有必要信息至关重要。在您的情况下,f_arr 是 C-contiguous np.ndarray,因此我们使用 C-contiguous 内存视图 double[::1] 而不是普通内存视图 double[:]。不同之处在于,索引 C-contiguous 内存视图会减少到纯 C 代码 f_arr[i],而后者会减少到 f_arr[i + f_arr.strides[0]].

  • 接下来,值得一提的是Python幂运算符a**2将被C代码pow(a,2)替代,即我们称[=24] =]-功能。在紧密循环中调用函数会产生不必要的开销,即使在 C 中也是如此。在 C 中,我们只需编写 a*a。所以让我们在 Cython 中做同样的事情。

在代码中:

%%cython -c=-O3 -c=-march=native
# for MSVC use: -c=/Ox -c=/arch:AVX2
#           or: -c=/Ox -c=/arch:AVX512 if your CPU supports AVX512 

import numpy as np
cimport numpy as np
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
def func2_cy(double[::1] f_arr, int nech):
    cdef int n = f_arr.size
    cdef double[::1] sf2 = np.zeros(nech)
    cdef int i, j
    for i in range(nech):
        for j in range(n-i-2):
            sf2[i] += (f_arr[i+j+1]-f_arr[i])*(f_arr[i+j+1]-f_arr[i])
        sf2[i] /= (n - i - 2)
    return np.asarray(sf2)

在我的机器上用 Apple Clang 13.1.6 在 macOS 上编译,这比前面提到的 numba 实现慢:

In [5]: %timeit func2_cy(f_arr, n)
4.2 s ± 32 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

然而,人们应该意识到 Cython 基本上只是一个 Python 到 C 的编译器。这意味着在我们删除所有 python 交互之后,我们可以尝试使用与使用 C 代码时相同的性能技巧:传递优化标志 -O3 并启用所有可用的 CPU 指令在当前平台上 -march=native。请注意,这也意味着良好的 Cython 代码(即在紧密循环中没有 python 交互的代码)的性能在很大程度上取决于您的 C 编译器。根据我的经验,由于 MSVC 的问题 auto-vectorization,numba 在 Windows 上通常比 Cython 快。在 macOS/Linux 和 gcc 或 clang 上,通常是相反的。

这些 well-known 性能技巧之一是 loop-unrolling 以便为编译器提供 SIMD-vectorize 特定循环的提示。展开最内层的循环,函数如下所示:

%%cython -c=-O3 -c=-march=native
# for MSVC use: -c=/Ox -c=/arch:AVX2
#           or: -c=/Ox -c=/arch:AVX512 if your CPU supports AVX512 

#cython: cdivision=True

import numpy as np
cimport numpy as np
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
def func3_cy(double[::1] f_arr, int nech):
    cdef int n = f_arr.size
    cdef double[::1] sf2 = np.zeros(nech)
    cdef double fi, fij, fij1, fij2, fij3
    cdef int i, j, ntmp
    for i in range(nech):
        # unroll the inner loop for better SIMD vectorization
        j = 1
        ntmp = (n-i-2) - ((n-i-2) % 4)
        # we use a while-loop since Cython doesn't support range(j, ntmp, 4)
        while j < ntmp:
            # unpack the values and hope the CPU keeps them in its register
            fi   = f_arr[i]
            fij  = f_arr[i+j+1]
            fij1 = f_arr[i+j+2]
            fij2 = f_arr[i+j+3]
            fij3 = f_arr[i+j+4]
            sf2[i] += ((fij-fi)*(fij-fi) + (fij1-fi)*(fij1-fi) + (fij2-fi)*(fij2-fi) + (fij3-fi)*(fij3-fi))
            j += 4
        for j in range(ntmp, n - i - 2):
            sf2[i] += (f_arr[i+j+1] - f_arr[i]) * (f_arr[i+j+1] - f_arr[i])
        sf2[i] /= (n-i-2)
    return np.asarray(sf2)

在我的机器上,这比 numba 快近两倍:

In [7]: %timeit func3_cy(f_arr, n)
1.7 s ± 54.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

更进一步,我们可以在 prange which is just a thin wrapper around OpenMP:

的帮助下并行化外循环
%%cython -c=-O3 -c=-march=native -c=-fopenmp --link-args=-fopenmp
# for MSVC use: -c=/Ox -c=/openmp -c=/arch:AVX2
#           or: -c=/Ox -c=/openmp -c=/arch:AVX512 if your CPU supports AVX512 

#cython: cdivision=True

import numpy as np
cimport numpy as np
cimport cython
from cython.parallel cimport prange

@cython.boundscheck(False)
@cython.wraparound(False)
def func4_cy(double[::1] f_arr, int nech):
    cdef int n = f_arr.size
    cdef double[::1] sf2 = np.zeros(nech)
    cdef double fi, fij, fij1, fij2, fij3
    cdef int i, j, ntmp
    for i in prange(nech, nogil=True, schedule="static", chunksize=1):
        # unroll the inner loop for better SIMD vectorization
        j = 1
        ntmp = (n-i-2) - ((n-i-2) % 4)
        # we use a while-loop since Cython doesn't support range(j, ntmp, 4)
        while j < ntmp:
            # unpack the values and hope the CPU keeps them in its register
            fi   = f_arr[i]
            fij  = f_arr[i+j+1]
            fij1 = f_arr[i+j+2]
            fij2 = f_arr[i+j+3]
            fij3 = f_arr[i+j+4]
            sf2[i] += ((fij-fi)*(fij-fi) + (fij1-fi)*(fij1-fi) + (fij2-fi)*(fij2-fi) + (fij3-fi)*(fij3-fi))
            j += 4
        for j in range(ntmp, n - i - 2):
            sf2[i] += (f_arr[i+j+1] - f_arr[i]) * (f_arr[i+j+1] - f_arr[i])
        sf2[i] /= (n-i-2)
    return np.asarray(sf2)

在我有 8 CPU 个核心的机器上,这是迄今为止最快的实现:

In [9]: %timeit func4_cy(f_arr, n)
329 ms ± 15.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

但是,值得一提的是,Numba 还支持线程并行性,因此我希望 Numba 具有类似的性能。