一种加速平滑函数的方法

A way to speed up the smoothing function

我有一个函数,用于通过对因子 2 取移动平均值来平滑曲线。但是,在当前形式下,由于循环,该函数很慢。我添加了 numba 来提高速度,但速度仍然很慢。关于如何提高速度有什么建议吗?

from numba import prange, jit
@jit(nopython=True, parallel=True)      
def smoothing_function(x,y, window=2, pad = 1):
    len_x    = len(x)
    max_x    = np.max(x)
    xoutmid  = np.full(len_x, np.nan)
    xoutmean = np.full(len_x, np.nan)
    yout     = np.full(len_x, np.nan)
    
    for i in prange(len_x):
        x0 = x[i]
        xf = window*x[i]
        
        if xf < max_x:
            e = np.where(x  == x.flat[np.abs(x - xf).argmin()])[0][0]
            if e<len(x):
                yout[i]     = np.nanmean(y[i:e])
                xoutmid[i]  = x[i] + np.log10(0.5) * (x[i] - x[e])
                xoutmean[i] = np.nanmean(x[i:e])
    return xoutmid, xoutmean, yout 
# Working example

f = lambda x: x**(-1.7)*2*np.random.rand(len(x))
x = np.logspace(np.log10(1e-5), np.log10(1), 1000)
xvals, yvals  = x, f(x)


%timeit res =smoothing_function(xvals, yvals, window=2, pad = 1)

# plot results
plt.loglog(xvals, yvals)
plt.loglog(res[1], res[2])

问题是您计算结束索引 (e) 的效率非常低。如果您使用 x 在 logspace 中这一事实,这可以更快地完成,因为您知道 2 个连续点之间的距离并且只需要计算索引即 log(window ) 远离初始点。工作示例如下:

@jit(nopython=True, parallel=True)
def smoothing_function2(x,y, window=2, pad = 1):
    len_x    = len(x)
    max_x    = np.max(x)
    xoutmid  = np.full(len_x, np.nan)
    xoutmean = np.full(len_x, np.nan)
    yout     = np.full(len_x, np.nan)

    f_idx = int(len(x)*np.log10(window)/(np.log10(x[-1])-np.log10(x[0])))

    for i in prange(len_x):
        if window*x[i] < max_x:
            e = min(i+f_idx, len_x-1)
            yout[i]     = np.nanmean(y[i:e])
            xoutmid[i]  = x[i] + np.log10(0.5) * (x[i] - x[e])
            xoutmean[i] = np.nanmean(x[i:e])
    return xoutmid, xoutmean, yout

f = lambda x: x**(-1.7)*2*np.random.rand(len(x))
x = np.logspace(np.log10(1e-5), np.log10(1), 1000)
xvals, yvals  = x, f(x)

res1 = smoothing_function(xvals, yvals, window=2, pad = 1)
res2 = smoothing_function2(xvals, yvals, window=2, pad = 1)

print([np.nansum((r1 - r2)**2) for r1, r2 in zip(res1, res2)])  # verify that all the outputs are same

%timeit res = smoothing_function(xvals, yvals, window=2, pad = 1)

%timeit res = smoothing_function2(xvals, yvals, window=2, pad = 1)

输出如下:

[0.0, 0.0, 0.0]
337 µs ± 59.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
49.1 µs ± 2.6 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

这验证了两个函数 return 相同的输出但是 smoothing_function2 快了 ~6.8 倍。如果 x 不限于在 logspace 中,您仍然可以使用您正在使用的任何 space 的属性来获得类似的改进。可能有更多方法可以进一步改进这一点,这取决于您的目标是什么。您也可以尝试在 C++ 或 Cython 中实现它。

@Abhinav 的解决方案非常有效。另一个稍微快一点的解决方案是这个:

from numba import prange, jit
@jit(nopython=True, parallel=True)      
def smoothing_function(x,y, window=2, pad = 1):
    def bisection(array,value):
        '''Given an ``array`` , and given a ``value`` , returns an index j such that ``value`` is between array[j]
        and array[j+1]. ``array`` must be monotonic increasing. j=-1 or j=len(array) is returned
        to indicate that ``value`` is out of range below and above respectively.'''
        n = len(array)
        if (value < array[0]):
            return -1
        elif (value > array[n-1]):
            return n
        jl = 0# Initialize lower
        ju = n-1# and upper limits.
        while (ju-jl > 1):# If we are not yet done,
            jm=(ju+jl) >> 1# compute a midpoint with a bitshift
            if (value >= array[jm]):
                jl=jm# and replace either the lower limit
            else:
                ju=jm# or the upper limit, as appropriate.
            # Repeat until the test condition is satisfied.
        if (value == array[0]):# edge cases at bottom
            return 0
        elif (value == array[n-1]):# and top
            return n-1
        else:
            return jl

    len_x    = len(x)
    max_x    = np.max(x)
    xoutmid  = np.full(len_x, np.nan)
    xoutmean = np.full(len_x, np.nan)
    yout     = np.full(len_x, np.nan)
    
    for i in prange(len_x):
        x0 = x[i]
        xf = window*x0
        
        if xf < max_x:
            #e = np.where(x  == x[np.abs(x - xf).argmin()])[0][0]
            e = bisection(x,xf)
            if e<len_x:
                yout[i]     = np.nanmean(y[i:e])
                xoutmid[i]  = x0 + np.log10(0.5) * (x0 - x[e])
                xoutmean[i] = np.nanmean(x[i:e])

    return xoutmid, xoutmean, yout