处理 NumPy 数组循环的最有效方法是什么?

What is the most efficient way to deal with a loop on NumPy arrays?

问题很简单:这是我目前的算法。由于阵列上的循环,这非常慢。有没有办法更改它以避免循环并利用 NumPy 数组类型?

import numpy as np

def loopingFunction(listOfVector1, listOfVector2):
    resultArray = []

    for vector1 in listOfVector1:
        result = 0

        for vector2 in listOfVector2:
            result += np.dot(vector1, vector2) * vector2[2]

        resultArray.append(result)

    return np.array(resultArray)

listOfVector1x = np.linspace(0,0.33,1000)
listOfVector1y = np.linspace(0.33,0.66,1000)
listOfVector1z = np.linspace(0.66,1,1000)

listOfVector1 = np.column_stack((listOfVector1x, listOfVector1y, listOfVector1z))

listOfVector2x = np.linspace(0.33,0.66,1000)
listOfVector2y = np.linspace(0.66,1,1000)
listOfVector2z = np.linspace(0, 0.33, 1000)

listOfVector2 = np.column_stack((listOfVector2x, listOfVector2y, listOfVector2z))

result = loopingFunction(listOfVector1, listOfVector2)

我应该处理非常大的数组,每个数组有超过 1000 个向量。所以如果你有什么建议,我会接受的。

你至少可以去掉两个for循环来节省很多时间,直接用矩阵计算

import time

import numpy as np

def loopingFunction(listOfVector1, listOfVector2):
    resultArray = []

    for vector1 in listOfVector1:
        result = 0

        for vector2 in listOfVector2:
            result += np.dot(vector1, vector2) * vector2[2]

        resultArray.append(result)

    return np.array(resultArray)

def loopingFunction2(listOfVector1, listOfVector2):
    resultArray = np.sum(np.dot(listOfVector1, listOfVector2.T) * listOfVector2[:,2], axis=1)

    return resultArray

listOfVector1x = np.linspace(0,0.33,1000)
listOfVector1y = np.linspace(0.33,0.66,1000)
listOfVector1z = np.linspace(0.66,1,1000)

listOfVector1 = np.column_stack((listOfVector1x, listOfVector1y, listOfVector1z))

listOfVector2x = np.linspace(0.33,0.66,1000)
listOfVector2y = np.linspace(0.66,1,1000)
listOfVector2z = np.linspace(0, 0.33, 1000)

listOfVector2 = np.column_stack((listOfVector2x, listOfVector2y, listOfVector2z))
import time
t0 = time.time()
result = loopingFunction(listOfVector1, listOfVector2)
print('time old version',time.time() - t0)
t0 = time.time()
result2 = loopingFunction2(listOfVector1, listOfVector2)
print('time matrix computation version',time.time() - t0)
print('Are results are the same',np.allclose(result,result2))

给出

time old version 1.174513578414917
time matrix computation version 0.011968612670898438
Are results are the same True

基本上循环越少越好

避免嵌套循环,调整计算顺序,比优化后np.einsum快20倍,比原程序快近400_000倍:

>>> out = listOfVector1.dot(listOfVector2[:, 2].dot(listOfVector2))
>>> np.allclose(out, loopingFunction(listOfVector1, listOfVector2))
True

测试:

>>> timeit(lambda: loopingFunction(listOfVector1, listOfVector2), number=1)
1.4389081999834161
>>> timeit(lambda: listOfVector1.dot(listOfVector2[:, 2].dot(listOfVector2)), number=400_000)
1.3162514999858104
>>> timeit(lambda: np.einsum('ij, kj, k->i', listOfVector1, listOfVector2, listOfVector2[:, 2], optimize=['einsum_path', (1, 2), (0, 1)]), number=18_000)
1.3501517999975476

强制性np.einsum基准

r2 = np.einsum('ij, kj, k->i', listOfVector1, listOfVector2, listOfVector2[:,2], optimize=['einsum_path', (1, 2), (0, 1)])
#%timeit result: 10000 loops, best of 5: 116 µs per loop

np.testing.assert_allclose(result, r2)

为了好玩,我编写了一个优化的 Numba 实现,优于所有其他实现。它基于@MichaelSzczesny 答案的einsum 优化。

import numpy as np
import numba as nb

# This decorator ask Numba to eagerly compile the code using 
# the provided signature string (containing the parameter types).
@nb.njit('(float64[:,::1], float64[:,::1])')
def loopingFunction_numba(listOfVector1, listOfVector2):
    n, m = listOfVector1.shape
    assert m == 3

    result = np.empty(n)
    s1 = s2 = s3 = 0.0

    for i in range(n):
        factor = listOfVector2[i, 2]
        s1 += listOfVector2[i, 0] * factor
        s2 += listOfVector2[i, 1] * factor
        s3 += listOfVector2[i, 2] * factor

    for i in range(n):
        result[i] = listOfVector1[i, 0] * s1 + listOfVector1[i, 1] * s2 + listOfVector1[i, 2] * s3

    return result

result = loopingFunction_numba(listOfVector1, listOfVector2)

这是我的 i5-9600KF 处理器的时序:

Initial:          1052.0 ms
ymmx:                5.121 ms
MichaelSzczesny:        75.40 us
MechanicPig:             3.36 us
Numba:                   2.74 us
Optimal lower bound:     0.66 us

此解决方案比原始解决方案快 ~384_000 倍。请注意,它甚至不使用处理器的 SIMD 指令,这会使我的机器加速约 4 倍。这只有通过比当前输入多 SIMD-friendly 的转置输入才有可能。换位也可以加快其他答案的速度,例如 MechanicPig 的答案,因为 BLAS 通常可以从中受益。生成的代码将达到符号 1_000_000 加速因子!