如何加速即使使用 Numba 也很慢的计算

How to speed up the computation that is slow even with Numba

我的 Python 代码计算缓慢,遇到了问题。根据下面的 pycallgraph,瓶颈似乎是名为 miepython.miepython.mie_S1_S2 的模块(以粉红色突出显示),每次调用需要 0.47 秒。

本模块源码如下:

import numpy as np
from numba import njit, int32, float64, complex128

__all__ = ('ez_mie',
           'ez_intensities',
           'generate_mie_costheta',
           'i_par',
           'i_per',
           'i_unpolarized',
           'mie',
           'mie_S1_S2',
           'mie_cdf',
           'mie_mu_with_uniform_cdf',
           )


@njit((complex128, float64, float64[:]), cache=True)
def _mie_S1_S2(m, x, mu):
    """
    Calculate the scattering amplitude functions for spheres.

    The amplitude functions have been normalized so that when integrated
    over all 4*pi solid angles, the integral will be qext*pi*x**2.

    The units are weird, sr**(-0.5)

    Args:
        m: the complex index of refraction of the sphere
        x: the size parameter of the sphere
        mu: array of angles, cos(theta), to calculate scattering amplitudes

    Returns:
        S1, S2: the scattering amplitudes at each angle mu [sr**(-0.5)]
    """
    nstop = int(x + 4.05 * x**0.33333 + 2.0) + 1
    a = np.zeros(nstop - 1, dtype=np.complex128)
    b = np.zeros(nstop - 1, dtype=np.complex128)
    _mie_An_Bn(m, x, a, b)

    nangles = len(mu)
    S1 = np.zeros(nangles, dtype=np.complex128)
    S2 = np.zeros(nangles, dtype=np.complex128)

    nstop = len(a)
    for k in range(nangles):
        pi_nm2 = 0
        pi_nm1 = 1
        for n in range(1, nstop):
            tau_nm1 = n * mu[k] * pi_nm1 - (n + 1) * pi_nm2

            S1[k] += (2 * n + 1) * (pi_nm1 * a[n - 1]
                                    + tau_nm1 * b[n - 1]) / (n + 1) / n

            S2[k] += (2 * n + 1) * (tau_nm1 * a[n - 1]
                                    + pi_nm1 * b[n - 1]) / (n + 1) / n

            temp = pi_nm1
            pi_nm1 = ((2 * n + 1) * mu[k] * pi_nm1 - (n + 1) * pi_nm2) / n
            pi_nm2 = temp

    # calculate norm = sqrt(pi * Qext * x**2)
    n = np.arange(1, nstop + 1)
    norm = np.sqrt(2 * np.pi * np.sum((2 * n + 1) * (a.real + b.real)))

    S1 /= norm
    S2 /= norm

    return [S1, S2]

显然,源代码是由 Numba 编译的,因此它应该比实际速度更快。此函数中 for 循环的迭代次数约为 25,000 (len(mu)=50, len(a)-1=500).

关于如何加快此计算的任何想法?是否有什么东西阻碍了 Numba 的快速计算?或者,你觉得计算速度已经足够快了?

[更多详情]

在上面,使用了另一个函数_mie_An_Bn。这个函数也是jitted的,源码如下:

@njit((complex128, float64, complex128[:], complex128[:]), cache=True)
def _mie_An_Bn(m, x, a, b):
    """
    Compute arrays of Mie coefficients A and B for a sphere.

    This estimates the size of the arrays based on Wiscombe's formula. The length
    of the arrays is chosen so that the error when the series are summed is
    around 1e-6.

    Args:
        m: the complex index of refraction of the sphere
        x: the size parameter of the sphere

    Returns:
        An, Bn: arrays of Mie coefficents
    """
    psi_nm1 = np.sin(x)                   # nm1 = n-1 = 0
    psi_n = psi_nm1 / x - np.cos(x)       # n = 1
    xi_nm1 = complex(psi_nm1, np.cos(x))
    xi_n = complex(psi_n, np.cos(x) / x + np.sin(x))

    nstop = len(a)
    if m.real > 0.0:
        D = _D_calc(m, x, nstop + 1)

        for n in range(1, nstop):
            temp = D[n] / m + n / x
            a[n - 1] = (temp * psi_n - psi_nm1) / (temp * xi_n - xi_nm1)
            temp = D[n] * m + n / x
            b[n - 1] = (temp * psi_n - psi_nm1) / (temp * xi_n - xi_nm1)
            xi = (2 * n + 1) * xi_n / x - xi_nm1
            xi_nm1 = xi_n
            xi_n = xi
            psi_nm1 = psi_n
            psi_n = xi_n.real

    else:
        for n in range(1, nstop):
            a[n - 1] = (n * psi_n / x - psi_nm1) / (n * xi_n / x - xi_nm1)
            b[n - 1] = psi_n / xi_n
            xi = (2 * n + 1) * xi_n / x - xi_nm1
            xi_nm1 = xi_n
            xi_n = xi
            psi_nm1 = psi_n
            psi_n = xi_n.real

示例输入如下:

m = 1.336-2.462e-09j
x = 8526.95
mu = np.array([-1., -0.7500396, 0.46037385, 0.5988121, 0.67384093, 0.72468684, 0.76421644, 0.79175856, 0.81723714, 0.83962897, 0.85924182, 0.87641596, 0.89383665, 0.90708978, 0.91931481, 0.93067567, 0.94073113, 0.94961222, 0.95689496, 0.96467123,  0.97138347, 0.97791831, 0.98339434, 0.98870543, 0.99414948, 0.9975728   0.9989995, 0.9989995, 0.9989995, 0.9989995, 0.9989995,0.99899951, 0.99899951, 0.99899951, 0.99899951, 0.99899951, 0.99899951, 0.99899951, 0.99899951, 0.99899951, 0.99899952,  0.99899952,
  0.99899952,  0.99899952,  0.99899952,  0.99899952,  0.99899952, 0.99899952, 0.99899952,  1.        ])

我关注 _mie_S1_S2,因为它似乎是所提供示例数据集上最昂贵的函数。

首先,如果没有+Inf、-Inf、-0等值,可以给JIT使用参数fastmath=True来加速计算或 NaN 计算。

然后你可以预计算一些包含除法或隐式整数到浮点数转换的昂贵表达式。请注意 (2 * n + 1) / n = 2 + 1/n(n + 1) / n = 1 + 1/n。这对于减少预计算数组的数量很有用,但不会改变我机器上的性能(这可能会因目标架构而改变)。另请注意,这样的预计算对结果准确性有轻微影响(大部分时间可以忽略不计,有时比参考实现更好)。

在我的机器上,此策略使代码在使用 fastmath=True 时速度提高 4.5 倍,在没有使用时速度提高 2.8 倍。

基于 k 的循环可以 并行化 使用 Numba 的 parallel=Trueprange。然而,这可能并不总是在所有机器上都更快(尤其是那些有很多内核的机器),因为循环非常快。

这是最终代码:

@njit((complex128, float64, float64[:]), cache=True, parallel=True)
def _mie_S1_S2_opt(m, x, mu):
    nstop = int(x + 4.05 * x**0.33333 + 2.0) + 1
    a = np.zeros(nstop - 1, dtype=np.complex128)
    b = np.zeros(nstop - 1, dtype=np.complex128)
    _mie_An_Bn(m, x, a, b)

    nangles = len(mu)
    S1 = np.zeros(nangles, dtype=np.complex128)
    S2 = np.zeros(nangles, dtype=np.complex128)

    factor1 = np.empty(nstop, dtype=np.float64)
    factor2 = np.empty(nstop, dtype=np.float64)
    factor3 = np.empty(nstop, dtype=np.float64)

    for n in range(1, nstop):
        factor1[n - 1] = (2 * n + 1) / (n + 1) / n
        factor2[n - 1] = (2 * n + 1) / n
        factor3[n - 1] = (n + 1) / n

    nstop = len(a)
    for k in nb.prange(nangles):
        pi_nm2 = 0
        pi_nm1 = 1
        for n in range(1, nstop):
            i = n - 1
            tau_nm1 = n * mu[k] * pi_nm1 - (n + 1.0) * pi_nm2

            S1[k] += factor1[i] * (pi_nm1 * a[i] + tau_nm1 * b[i])
            S2[k] += factor1[i] * (tau_nm1 * a[i] + pi_nm1 * b[i])

            temp = pi_nm1
            pi_nm1 = factor2[i] * mu[k] * pi_nm1 - factor3[i] * pi_nm2
            pi_nm2 = temp

    # calculate norm = sqrt(pi * Qext * x**2)
    n = np.arange(1, nstop + 1)
    norm = np.sqrt(2 * np.pi * np.sum((2 * n + 1) * (a.real + b.real)))

    S1 /= norm
    S2 /= norm

    return [S1, S2]

%timeit -n 1000 _mie_S1_S2_opt(m, x, mu)

在我的 6 核机器上,最终优化的实现速度 12 倍 fastmath=True 和 8.8 倍。请注意,在其他功能中使用类似的策略也可能有助于加快速度。