在二维数组上使用 numpy.interp 的最快方法
fastest way to use numpy.interp on a 2-D array
我有以下问题。我正在尝试找到在二维 x 坐标数组上使用 numpy 插值方法的最快方法。
import numpy as np
xp = [0.0, 0.25, 0.5, 0.75, 1.0]
np.random.seed(100)
x = np.random.rand(10)
fp = np.random.rand(10, 5)
所以基本上,xp
将是数据点的 x 坐标,x
将是一个包含我要插值的值的 x 坐标的数组,fp
将是一个二维数组,其中包含数据点的 y 坐标。
xp
[0.0, 0.25, 0.5, 0.75, 1.0]
x
array([ 0.54340494, 0.27836939, 0.42451759, 0.84477613, 0.00471886,
0.12156912, 0.67074908, 0.82585276, 0.13670659, 0.57509333])
fp
array([[ 0.89132195, 0.20920212, 0.18532822, 0.10837689, 0.21969749],
[ 0.97862378, 0.81168315, 0.17194101, 0.81622475, 0.27407375],
[ 0.43170418, 0.94002982, 0.81764938, 0.33611195, 0.17541045],
[ 0.37283205, 0.00568851, 0.25242635, 0.79566251, 0.01525497],
[ 0.59884338, 0.60380454, 0.10514769, 0.38194344, 0.03647606],
[ 0.89041156, 0.98092086, 0.05994199, 0.89054594, 0.5769015 ],
[ 0.74247969, 0.63018394, 0.58184219, 0.02043913, 0.21002658],
[ 0.54468488, 0.76911517, 0.25069523, 0.28589569, 0.85239509],
[ 0.97500649, 0.88485329, 0.35950784, 0.59885895, 0.35479561],
[ 0.34019022, 0.17808099, 0.23769421, 0.04486228, 0.50543143]])
期望的结果应该是这样的:
array([ 0.17196795, 0.73908678, 0.85459966, 0.49980648, 0.59893702,
0.9344241 , 0.19840596, 0.45777785, 0.92570835, 0.17977264])
再次寻找最快的方法,因为这是我的问题的简化版本,它的长度约为 100 万对 10。
谢谢
所以基本上你希望输出等同于
np.array([np.interp(x[i], xp, fp[i]) for i in range(x.size)])
但是 for
循环将使大型 x.size
的循环变得相当慢
这应该有效:
def multiInterp(x, xp, fp):
i, j = np.nonzero(np.diff(np.array(xp)[None,:] < x[:,None]))
d = (x - xp[j]) / np.diff(xp)[j]
return fp[i, j] + np.diff(fp)[i, j] * d
编辑:这个效果更好,可以处理更大的数组:
def multiInterp2(x, xp, fp):
i = np.arange(x.size)
j = np.searchsorted(xp, x) - 1
d = (x - xp[j]) / (xp[j + 1] - xp[j])
return (1 - d) * fp[i, j] + fp[i, j + 1] * d
测试:
multiInterp2(x, xp, fp)
Out:
array([ 0.17196795, 0.73908678, 0.85459966, 0.49980648, 0.59893702,
0.9344241 , 0.19840596, 0.45777785, 0.92570835, 0.17977264])
使用原始数据进行时序测试:
%timeit multiInterp2(x, xp, fp)
The slowest run took 6.87 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 25.5 µs per loop
%timeit np.concatenate([compiled_interp(x[[i]], xp, fp[i]) for i in range(fp.shape[0])])
The slowest run took 4.03 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 39.3 µs per loop
即使 x
的小尺寸似乎也更快
让我们尝试一些更大、更大的东西:
n = 10000
m = 10000
xp = np.linspace(0, 1, n)
x = np.random.rand(m)
fp = np.random.rand(m, n)
%timeit b() # kazemakase's above
10 loops, best of 3: 38.4 ms per loop
%timeit multiInterp2(x, xp, fp)
100 loops, best of 3: 2.4 ms per loop
优点甚至比 np.interp
的编译版本更好
np.interp
基本上是编译后的 numpy.core.multiarray.interp
的包装器。我们可以通过直接使用它来削减一点性能:
from numpy.core.multiarray import interp as compiled_interp
def a(x=x, xp=xp, fp=fp):
return np.array([np.interp(x[i], xp, fp[i]) for i in range(fp.shape[0])])
def b(x=x, xp=xp, fp=fp):
return np.concatenate([compiled_interp(x[[i]], xp, fp[i]) for i in range(fp.shape[0])])
def multiInterp(x=x, xp=xp, fp=fp):
i, j = np.nonzero(np.diff(xp[None,:] < x[:,None]))
d = (x - xp[j]) / np.diff(xp)[j]
return fp[i, j] + np.diff(fp)[i, j] * d
时序测试表明,对于示例数组,这与 Daniel Forsman 的出色解决方案不相上下:
%timeit a()
10000 loops, best of 3: 44.7 µs per loop
%timeit b()
10000 loops, best of 3: 32 µs per loop
%timeit multiInterp()
10000 loops, best of 3: 33.3 µs per loop
更新
对于稍大的数组,multiInterp 拥有发言权:
n = 100
m = 1000
xp = np.linspace(0, 1, n)
x = np.random.rand(m)
fp = np.random.rand(m, n)
%timeit a()
100 loops, best of 3: 4.14 ms per loop
%timeit b()
100 loops, best of 3: 2.97 ms per loop
%timeit multiInterp()
1000 loops, best of 3: 1.42 ms per loop
但对于更大的它就落后了:
n = 1000
m = 10000
%timeit a()
10 loops, best of 3: 43.3 ms per loop
%timeit b()
10 loops, best of 3: 32.9 ms per loop
%timeit multiInterp()
10 loops, best of 3: 132 ms per loop
最后,对于非常大的数组(我在 32 位),临时数组成为一个问题:
n = 10000
m = 10000
%timeit a()
10 loops, best of 3: 46.2 ms per loop
%timeit b()
10 loops, best of 3: 32.1 ms per loop
%timeit multiInterp()
# MemoryError
我有以下问题。我正在尝试找到在二维 x 坐标数组上使用 numpy 插值方法的最快方法。
import numpy as np
xp = [0.0, 0.25, 0.5, 0.75, 1.0]
np.random.seed(100)
x = np.random.rand(10)
fp = np.random.rand(10, 5)
所以基本上,xp
将是数据点的 x 坐标,x
将是一个包含我要插值的值的 x 坐标的数组,fp
将是一个二维数组,其中包含数据点的 y 坐标。
xp
[0.0, 0.25, 0.5, 0.75, 1.0]
x
array([ 0.54340494, 0.27836939, 0.42451759, 0.84477613, 0.00471886,
0.12156912, 0.67074908, 0.82585276, 0.13670659, 0.57509333])
fp
array([[ 0.89132195, 0.20920212, 0.18532822, 0.10837689, 0.21969749],
[ 0.97862378, 0.81168315, 0.17194101, 0.81622475, 0.27407375],
[ 0.43170418, 0.94002982, 0.81764938, 0.33611195, 0.17541045],
[ 0.37283205, 0.00568851, 0.25242635, 0.79566251, 0.01525497],
[ 0.59884338, 0.60380454, 0.10514769, 0.38194344, 0.03647606],
[ 0.89041156, 0.98092086, 0.05994199, 0.89054594, 0.5769015 ],
[ 0.74247969, 0.63018394, 0.58184219, 0.02043913, 0.21002658],
[ 0.54468488, 0.76911517, 0.25069523, 0.28589569, 0.85239509],
[ 0.97500649, 0.88485329, 0.35950784, 0.59885895, 0.35479561],
[ 0.34019022, 0.17808099, 0.23769421, 0.04486228, 0.50543143]])
期望的结果应该是这样的:
array([ 0.17196795, 0.73908678, 0.85459966, 0.49980648, 0.59893702,
0.9344241 , 0.19840596, 0.45777785, 0.92570835, 0.17977264])
再次寻找最快的方法,因为这是我的问题的简化版本,它的长度约为 100 万对 10。
谢谢
所以基本上你希望输出等同于
np.array([np.interp(x[i], xp, fp[i]) for i in range(x.size)])
但是 for
循环将使大型 x.size
这应该有效:
def multiInterp(x, xp, fp):
i, j = np.nonzero(np.diff(np.array(xp)[None,:] < x[:,None]))
d = (x - xp[j]) / np.diff(xp)[j]
return fp[i, j] + np.diff(fp)[i, j] * d
编辑:这个效果更好,可以处理更大的数组:
def multiInterp2(x, xp, fp):
i = np.arange(x.size)
j = np.searchsorted(xp, x) - 1
d = (x - xp[j]) / (xp[j + 1] - xp[j])
return (1 - d) * fp[i, j] + fp[i, j + 1] * d
测试:
multiInterp2(x, xp, fp)
Out:
array([ 0.17196795, 0.73908678, 0.85459966, 0.49980648, 0.59893702,
0.9344241 , 0.19840596, 0.45777785, 0.92570835, 0.17977264])
使用原始数据进行时序测试:
%timeit multiInterp2(x, xp, fp)
The slowest run took 6.87 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 25.5 µs per loop
%timeit np.concatenate([compiled_interp(x[[i]], xp, fp[i]) for i in range(fp.shape[0])])
The slowest run took 4.03 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 39.3 µs per loop
即使 x
让我们尝试一些更大、更大的东西:
n = 10000
m = 10000
xp = np.linspace(0, 1, n)
x = np.random.rand(m)
fp = np.random.rand(m, n)
%timeit b() # kazemakase's above
10 loops, best of 3: 38.4 ms per loop
%timeit multiInterp2(x, xp, fp)
100 loops, best of 3: 2.4 ms per loop
优点甚至比 np.interp
np.interp
基本上是编译后的 numpy.core.multiarray.interp
的包装器。我们可以通过直接使用它来削减一点性能:
from numpy.core.multiarray import interp as compiled_interp
def a(x=x, xp=xp, fp=fp):
return np.array([np.interp(x[i], xp, fp[i]) for i in range(fp.shape[0])])
def b(x=x, xp=xp, fp=fp):
return np.concatenate([compiled_interp(x[[i]], xp, fp[i]) for i in range(fp.shape[0])])
def multiInterp(x=x, xp=xp, fp=fp):
i, j = np.nonzero(np.diff(xp[None,:] < x[:,None]))
d = (x - xp[j]) / np.diff(xp)[j]
return fp[i, j] + np.diff(fp)[i, j] * d
时序测试表明,对于示例数组,这与 Daniel Forsman 的出色解决方案不相上下:
%timeit a()
10000 loops, best of 3: 44.7 µs per loop
%timeit b()
10000 loops, best of 3: 32 µs per loop
%timeit multiInterp()
10000 loops, best of 3: 33.3 µs per loop
更新
对于稍大的数组,multiInterp 拥有发言权:
n = 100
m = 1000
xp = np.linspace(0, 1, n)
x = np.random.rand(m)
fp = np.random.rand(m, n)
%timeit a()
100 loops, best of 3: 4.14 ms per loop
%timeit b()
100 loops, best of 3: 2.97 ms per loop
%timeit multiInterp()
1000 loops, best of 3: 1.42 ms per loop
但对于更大的它就落后了:
n = 1000
m = 10000
%timeit a()
10 loops, best of 3: 43.3 ms per loop
%timeit b()
10 loops, best of 3: 32.9 ms per loop
%timeit multiInterp()
10 loops, best of 3: 132 ms per loop
最后,对于非常大的数组(我在 32 位),临时数组成为一个问题:
n = 10000
m = 10000
%timeit a()
10 loops, best of 3: 46.2 ms per loop
%timeit b()
10 loops, best of 3: 32.1 ms per loop
%timeit multiInterp()
# MemoryError