在二维数组上使用 numpy.interp 的最快方法

fastest way to use numpy.interp on a 2-D array

我有以下问题。我正在尝试找到在二维 x 坐标数组上使用 numpy 插值方法的最快方法。

import numpy as np

xp = [0.0, 0.25, 0.5, 0.75, 1.0]

np.random.seed(100)
x = np.random.rand(10)
fp = np.random.rand(10, 5)

所以基本上,xp 将是数据点的 x 坐标,x 将是一个包含我要插值的值的 x 坐标的数组,fp 将是一个二维数组,其中包含数据点的 y 坐标。

xp
[0.0, 0.25, 0.5, 0.75, 1.0]

x
array([ 0.54340494,  0.27836939,  0.42451759,  0.84477613,  0.00471886,
        0.12156912,  0.67074908,  0.82585276,  0.13670659,  0.57509333])

fp
array([[ 0.89132195,  0.20920212,  0.18532822,  0.10837689,  0.21969749],
       [ 0.97862378,  0.81168315,  0.17194101,  0.81622475,  0.27407375],
       [ 0.43170418,  0.94002982,  0.81764938,  0.33611195,  0.17541045],
       [ 0.37283205,  0.00568851,  0.25242635,  0.79566251,  0.01525497],
       [ 0.59884338,  0.60380454,  0.10514769,  0.38194344,  0.03647606],
       [ 0.89041156,  0.98092086,  0.05994199,  0.89054594,  0.5769015 ],
       [ 0.74247969,  0.63018394,  0.58184219,  0.02043913,  0.21002658],
       [ 0.54468488,  0.76911517,  0.25069523,  0.28589569,  0.85239509],
       [ 0.97500649,  0.88485329,  0.35950784,  0.59885895,  0.35479561],
       [ 0.34019022,  0.17808099,  0.23769421,  0.04486228,  0.50543143]])

期望的结果应该是这样的:

array([ 0.17196795,  0.73908678,  0.85459966,  0.49980648,  0.59893702,
        0.9344241 ,  0.19840596,  0.45777785,  0.92570835,  0.17977264])

再次寻找最快的方法,因为这是我的问题的简化版本,它的长度约为 100 万对 10。

谢谢

所以基本上你希望输出等同于

np.array([np.interp(x[i], xp, fp[i]) for i in range(x.size)])

但是 for 循环将使大型 x.size

的循环变得相当慢

这应该有效:

def multiInterp(x, xp, fp):
    i, j = np.nonzero(np.diff(np.array(xp)[None,:] < x[:,None]))
    d = (x - xp[j]) / np.diff(xp)[j]
    return fp[i, j] + np.diff(fp)[i, j] * d

编辑:这个效果更好,可以处理更大的数组:

def multiInterp2(x, xp, fp):
    i = np.arange(x.size)
    j = np.searchsorted(xp, x) - 1
    d = (x - xp[j]) / (xp[j + 1] - xp[j])
    return (1 - d) * fp[i, j] + fp[i, j + 1] * d

测试:

multiInterp2(x, xp, fp)
Out: 
array([ 0.17196795,  0.73908678,  0.85459966,  0.49980648,  0.59893702,
        0.9344241 ,  0.19840596,  0.45777785,  0.92570835,  0.17977264])

使用原始数据进行时序测试:

    %timeit multiInterp2(x, xp, fp)
The slowest run took 6.87 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 25.5 µs per loop

%timeit np.concatenate([compiled_interp(x[[i]], xp, fp[i]) for i in range(fp.shape[0])])
The slowest run took 4.03 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 39.3 µs per loop

即使 x

的小尺寸似乎也更快

让我们尝试一些更大、更大的东西:

n = 10000
m = 10000

xp = np.linspace(0, 1, n)
x = np.random.rand(m)
fp = np.random.rand(m, n)

%timeit b()  # kazemakase's above
10 loops, best of 3: 38.4 ms per loop

%timeit multiInterp2(x, xp, fp)
100 loops, best of 3: 2.4 ms per loop

优点甚至比 np.interp

的编译版本更好

np.interp 基本上是编译后的 numpy.core.multiarray.interp 的包装器。我们可以通过直接使用它来削减一点性能:

from numpy.core.multiarray import interp as compiled_interp

def a(x=x, xp=xp, fp=fp):
    return np.array([np.interp(x[i], xp, fp[i]) for i in range(fp.shape[0])])

def b(x=x, xp=xp, fp=fp):
    return np.concatenate([compiled_interp(x[[i]], xp, fp[i]) for i in range(fp.shape[0])])

def multiInterp(x=x, xp=xp, fp=fp):
    i, j = np.nonzero(np.diff(xp[None,:] < x[:,None]))
    d = (x - xp[j]) / np.diff(xp)[j]
    return fp[i, j] + np.diff(fp)[i, j] * d

时序测试表明,对于示例数组,这与 Daniel Forsman 的出色解决方案不相上下:

%timeit a()
10000 loops, best of 3: 44.7 µs per loop

%timeit b()
10000 loops, best of 3: 32 µs per loop

%timeit multiInterp()
10000 loops, best of 3: 33.3 µs per loop

更新

对于稍大的数组,multiInterp 拥有发言权:

n = 100
m = 1000

xp = np.linspace(0, 1, n)
x = np.random.rand(m)
fp = np.random.rand(m, n)

%timeit a()
100 loops, best of 3: 4.14 ms per loop

%timeit b()
100 loops, best of 3: 2.97 ms per loop

%timeit multiInterp()
1000 loops, best of 3: 1.42 ms per loop

但对于更大的它就落后了:

n = 1000
m = 10000

%timeit a()
10 loops, best of 3: 43.3 ms per loop

%timeit b()
10 loops, best of 3: 32.9 ms per loop

%timeit multiInterp()
10 loops, best of 3: 132 ms per loop

最后,对于非常大的数组(我在 32 位),临时数组成为一个问题:

n = 10000
m = 10000

%timeit a()
10 loops, best of 3: 46.2 ms per loop

%timeit b()
10 loops, best of 3: 32.1 ms per loop

%timeit multiInterp()
# MemoryError