沿轴插入 numpy.ndarray 的最佳方法
Best way to interpolate a numpy.ndarray along an axis
我有 4 维数据,比如温度,在 numpy.ndarray
中。
数组的形状是(ntime, nheight_in, nlat, nlon)
.
我对每个维度都有相应的一维数组,告诉我某个值对应于哪个时间、高度、纬度和经度,对于这个例子,我需要 height_in
给出以米为单位的高度。
现在我需要将它带到不同的高度尺寸,height_out
,具有不同的长度。
以下似乎符合我的要求:
ntime, nheight_in, nlat, nlon = t_in.shape
nheight_out = len(height_out)
t_out = np.empty((ntime, nheight_out, nlat, nlon))
for time in range(ntime):
for lat in range(nlat):
for lon in range(nlon):
t_out[time, :, lat, lon] = np.interp(
height_out, height_in, t[time, :, lat, lon]
)
但是有 3 个嵌套循环,并且在 python 和 numpy 之间进行了大量切换,我认为这不是最好的方法。
有什么改进建议吗?谢谢
scipy
的 interp1d
可以帮助:
import numpy as np
from scipy.interpolate import interp1d
ntime, nheight_in, nlat, nlon = (10, 20, 30, 40)
heights = np.linspace(0, 1, nheight_in)
t_in = np.random.normal(size=(ntime, nheight_in, nlat, nlon))
f_out = interp1d(heights, t_in, axis=1)
nheight_out = 50
new_heights = np.linspace(0, 1, nheight_out)
t_out = f_out(new_heights)
我一直在寻找适用于不规则间隔坐标的类似函数,并最终编写了我自己的函数。据我所知,插值处理得很好,内存和速度方面的性能也相当不错。我想我会在这里分享它,以防其他人遇到这个问题寻找类似的功能:
import numpy as np
import warnings
def interp_along_axis(y, x, newx, axis, inverse=False, method='linear'):
""" Interpolate vertical profiles, e.g. of atmospheric variables
using vectorized numpy operations
This function assumes that the x-xoordinate increases monotonically
ps:
* Updated to work with irregularly spaced x-coordinate.
* Updated to work with irregularly spaced newx-coordinate
* Updated to easily inverse the direction of the x-coordinate
* Updated to fill with nans outside extrapolation range
* Updated to include a linear interpolation method as well
(it was initially written for a cubic function)
Peter Kalverla
March 2018
--------------------
More info:
Algorithm from: http://www.paulinternet.nl/?page=bicubic
It approximates y = f(x) = ax^3 + bx^2 + cx + d
where y may be an ndarray input vector
Returns f(newx)
The algorithm uses the derivative f'(x) = 3ax^2 + 2bx + c
and uses the fact that:
f(0) = d
f(1) = a + b + c + d
f'(0) = c
f'(1) = 3a + 2b + c
Rewriting this yields expressions for a, b, c, d:
a = 2f(0) - 2f(1) + f'(0) + f'(1)
b = -3f(0) + 3f(1) - 2f'(0) - f'(1)
c = f'(0)
d = f(0)
These can be evaluated at two neighbouring points in x and
as such constitute the piecewise cubic interpolator.
"""
# View of x and y with axis as first dimension
if inverse:
_x = np.moveaxis(x, axis, 0)[::-1, ...]
_y = np.moveaxis(y, axis, 0)[::-1, ...]
_newx = np.moveaxis(newx, axis, 0)[::-1, ...]
else:
_y = np.moveaxis(y, axis, 0)
_x = np.moveaxis(x, axis, 0)
_newx = np.moveaxis(newx, axis, 0)
# Sanity checks
if np.any(_newx[0] < _x[0]) or np.any(_newx[-1] > _x[-1]):
# raise ValueError('This function cannot extrapolate')
warnings.warn("Some values are outside the interpolation range. "
"These will be filled with NaN")
if np.any(np.diff(_x, axis=0) < 0):
raise ValueError('x should increase monotonically')
if np.any(np.diff(_newx, axis=0) < 0):
raise ValueError('newx should increase monotonically')
# Cubic interpolation needs the gradient of y in addition to its values
if method == 'cubic':
# For now, simply use a numpy function to get the derivatives
# This produces the largest memory overhead of the function and
# could alternatively be done in passing.
ydx = np.gradient(_y, axis=0, edge_order=2)
# This will later be concatenated with a dynamic '0th' index
ind = [i for i in np.indices(_y.shape[1:])]
# Allocate the output array
original_dims = _y.shape
newdims = list(original_dims)
newdims[0] = len(_newx)
newy = np.zeros(newdims)
# set initial bounds
i_lower = np.zeros(_x.shape[1:], dtype=int)
i_upper = np.ones(_x.shape[1:], dtype=int)
x_lower = _x[0, ...]
x_upper = _x[1, ...]
for i, xi in enumerate(_newx):
# Start at the 'bottom' of the array and work upwards
# This only works if x and newx increase monotonically
# Update bounds where necessary and possible
needs_update = (xi > x_upper) & (i_upper+1<len(_x))
# print x_upper.max(), np.any(needs_update)
while np.any(needs_update):
i_lower = np.where(needs_update, i_lower+1, i_lower)
i_upper = i_lower + 1
x_lower = _x[[i_lower]+ind]
x_upper = _x[[i_upper]+ind]
# Check again
needs_update = (xi > x_upper) & (i_upper+1<len(_x))
# Express the position of xi relative to its neighbours
xj = (xi-x_lower)/(x_upper - x_lower)
# Determine where there is a valid interpolation range
within_bounds = (_x[0, ...] < xi) & (xi < _x[-1, ...])
if method == 'linear':
f0, f1 = _y[[i_lower]+ind], _y[[i_upper]+ind]
a = f1 - f0
b = f0
newy[i, ...] = np.where(within_bounds, a*xj+b, np.nan)
elif method=='cubic':
f0, f1 = _y[[i_lower]+ind], _y[[i_upper]+ind]
df0, df1 = ydx[[i_lower]+ind], ydx[[i_upper]+ind]
a = 2*f0 - 2*f1 + df0 + df1
b = -3*f0 + 3*f1 - 2*df0 - df1
c = df0
d = f0
newy[i, ...] = np.where(within_bounds, a*xj**3 + b*xj**2 + c*xj + d, np.nan)
else:
raise ValueError("invalid interpolation method"
"(choose 'linear' or 'cubic')")
if inverse:
newy = newy[::-1, ...]
return np.moveaxis(newy, 0, axis)
这是一个测试它的小例子:
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d as scipy1d
# toy coordinates and data
nx, ny, nz = 25, 30, 10
x = np.arange(nx)
y = np.arange(ny)
z = np.tile(np.arange(nz), (nx,ny,1)) + np.random.randn(nx, ny, nz)*.1
testdata = np.random.randn(nx,ny,nz) # x,y,z
# Desired z-coordinates (must be between bounds of z)
znew = np.tile(np.linspace(2,nz-2,50), (nx,ny,1)) + np.random.randn(nx, ny, 50)*0.01
# Inverse the coordinates for testing
z = z[..., ::-1]
znew = znew[..., ::-1]
# Now use own routine
ynew = interp_along_axis(testdata, z, znew, axis=2, inverse=True)
# Check some random profiles
for i in range(5):
randx = np.random.randint(nx)
randy = np.random.randint(ny)
checkfunc = scipy1d(z[randx, randy], testdata[randx,randy], kind='cubic')
checkdata = checkfunc(znew)
fig, ax = plt.subplots()
ax.plot(testdata[randx, randy], z[randx, randy], 'x', label='original data')
ax.plot(checkdata[randx, randy], znew[randx, randy], label='scipy')
ax.plot(ynew[randx, randy], znew[randx, randy], '--', label='Peter')
ax.legend()
plt.show()
遵循 numpy.interp 的标准,可以将 left/right 边界分配给范围外的点,在 within_bounds = ...
之后添加此行
out_lbound = (xi <= _x[0,...])
out_rbound = (_x[-1,...] <= xi)
和
newy[i, out_lbound] = _y[0, out_lbound]
newy[i, out_rbound] = _y[-1, out_rbound]
在 newy[i, ...] = ...
之后。
如果我很好地理解@Peter9192 使用的策略,我认为更改是在同一行中。我检查了一下,但可能有些奇怪的情况无法正常工作。
我有 4 维数据,比如温度,在 numpy.ndarray
中。
数组的形状是(ntime, nheight_in, nlat, nlon)
.
我对每个维度都有相应的一维数组,告诉我某个值对应于哪个时间、高度、纬度和经度,对于这个例子,我需要 height_in
给出以米为单位的高度。
现在我需要将它带到不同的高度尺寸,height_out
,具有不同的长度。
以下似乎符合我的要求:
ntime, nheight_in, nlat, nlon = t_in.shape
nheight_out = len(height_out)
t_out = np.empty((ntime, nheight_out, nlat, nlon))
for time in range(ntime):
for lat in range(nlat):
for lon in range(nlon):
t_out[time, :, lat, lon] = np.interp(
height_out, height_in, t[time, :, lat, lon]
)
但是有 3 个嵌套循环,并且在 python 和 numpy 之间进行了大量切换,我认为这不是最好的方法。
有什么改进建议吗?谢谢
scipy
的 interp1d
可以帮助:
import numpy as np
from scipy.interpolate import interp1d
ntime, nheight_in, nlat, nlon = (10, 20, 30, 40)
heights = np.linspace(0, 1, nheight_in)
t_in = np.random.normal(size=(ntime, nheight_in, nlat, nlon))
f_out = interp1d(heights, t_in, axis=1)
nheight_out = 50
new_heights = np.linspace(0, 1, nheight_out)
t_out = f_out(new_heights)
我一直在寻找适用于不规则间隔坐标的类似函数,并最终编写了我自己的函数。据我所知,插值处理得很好,内存和速度方面的性能也相当不错。我想我会在这里分享它,以防其他人遇到这个问题寻找类似的功能:
import numpy as np
import warnings
def interp_along_axis(y, x, newx, axis, inverse=False, method='linear'):
""" Interpolate vertical profiles, e.g. of atmospheric variables
using vectorized numpy operations
This function assumes that the x-xoordinate increases monotonically
ps:
* Updated to work with irregularly spaced x-coordinate.
* Updated to work with irregularly spaced newx-coordinate
* Updated to easily inverse the direction of the x-coordinate
* Updated to fill with nans outside extrapolation range
* Updated to include a linear interpolation method as well
(it was initially written for a cubic function)
Peter Kalverla
March 2018
--------------------
More info:
Algorithm from: http://www.paulinternet.nl/?page=bicubic
It approximates y = f(x) = ax^3 + bx^2 + cx + d
where y may be an ndarray input vector
Returns f(newx)
The algorithm uses the derivative f'(x) = 3ax^2 + 2bx + c
and uses the fact that:
f(0) = d
f(1) = a + b + c + d
f'(0) = c
f'(1) = 3a + 2b + c
Rewriting this yields expressions for a, b, c, d:
a = 2f(0) - 2f(1) + f'(0) + f'(1)
b = -3f(0) + 3f(1) - 2f'(0) - f'(1)
c = f'(0)
d = f(0)
These can be evaluated at two neighbouring points in x and
as such constitute the piecewise cubic interpolator.
"""
# View of x and y with axis as first dimension
if inverse:
_x = np.moveaxis(x, axis, 0)[::-1, ...]
_y = np.moveaxis(y, axis, 0)[::-1, ...]
_newx = np.moveaxis(newx, axis, 0)[::-1, ...]
else:
_y = np.moveaxis(y, axis, 0)
_x = np.moveaxis(x, axis, 0)
_newx = np.moveaxis(newx, axis, 0)
# Sanity checks
if np.any(_newx[0] < _x[0]) or np.any(_newx[-1] > _x[-1]):
# raise ValueError('This function cannot extrapolate')
warnings.warn("Some values are outside the interpolation range. "
"These will be filled with NaN")
if np.any(np.diff(_x, axis=0) < 0):
raise ValueError('x should increase monotonically')
if np.any(np.diff(_newx, axis=0) < 0):
raise ValueError('newx should increase monotonically')
# Cubic interpolation needs the gradient of y in addition to its values
if method == 'cubic':
# For now, simply use a numpy function to get the derivatives
# This produces the largest memory overhead of the function and
# could alternatively be done in passing.
ydx = np.gradient(_y, axis=0, edge_order=2)
# This will later be concatenated with a dynamic '0th' index
ind = [i for i in np.indices(_y.shape[1:])]
# Allocate the output array
original_dims = _y.shape
newdims = list(original_dims)
newdims[0] = len(_newx)
newy = np.zeros(newdims)
# set initial bounds
i_lower = np.zeros(_x.shape[1:], dtype=int)
i_upper = np.ones(_x.shape[1:], dtype=int)
x_lower = _x[0, ...]
x_upper = _x[1, ...]
for i, xi in enumerate(_newx):
# Start at the 'bottom' of the array and work upwards
# This only works if x and newx increase monotonically
# Update bounds where necessary and possible
needs_update = (xi > x_upper) & (i_upper+1<len(_x))
# print x_upper.max(), np.any(needs_update)
while np.any(needs_update):
i_lower = np.where(needs_update, i_lower+1, i_lower)
i_upper = i_lower + 1
x_lower = _x[[i_lower]+ind]
x_upper = _x[[i_upper]+ind]
# Check again
needs_update = (xi > x_upper) & (i_upper+1<len(_x))
# Express the position of xi relative to its neighbours
xj = (xi-x_lower)/(x_upper - x_lower)
# Determine where there is a valid interpolation range
within_bounds = (_x[0, ...] < xi) & (xi < _x[-1, ...])
if method == 'linear':
f0, f1 = _y[[i_lower]+ind], _y[[i_upper]+ind]
a = f1 - f0
b = f0
newy[i, ...] = np.where(within_bounds, a*xj+b, np.nan)
elif method=='cubic':
f0, f1 = _y[[i_lower]+ind], _y[[i_upper]+ind]
df0, df1 = ydx[[i_lower]+ind], ydx[[i_upper]+ind]
a = 2*f0 - 2*f1 + df0 + df1
b = -3*f0 + 3*f1 - 2*df0 - df1
c = df0
d = f0
newy[i, ...] = np.where(within_bounds, a*xj**3 + b*xj**2 + c*xj + d, np.nan)
else:
raise ValueError("invalid interpolation method"
"(choose 'linear' or 'cubic')")
if inverse:
newy = newy[::-1, ...]
return np.moveaxis(newy, 0, axis)
这是一个测试它的小例子:
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d as scipy1d
# toy coordinates and data
nx, ny, nz = 25, 30, 10
x = np.arange(nx)
y = np.arange(ny)
z = np.tile(np.arange(nz), (nx,ny,1)) + np.random.randn(nx, ny, nz)*.1
testdata = np.random.randn(nx,ny,nz) # x,y,z
# Desired z-coordinates (must be between bounds of z)
znew = np.tile(np.linspace(2,nz-2,50), (nx,ny,1)) + np.random.randn(nx, ny, 50)*0.01
# Inverse the coordinates for testing
z = z[..., ::-1]
znew = znew[..., ::-1]
# Now use own routine
ynew = interp_along_axis(testdata, z, znew, axis=2, inverse=True)
# Check some random profiles
for i in range(5):
randx = np.random.randint(nx)
randy = np.random.randint(ny)
checkfunc = scipy1d(z[randx, randy], testdata[randx,randy], kind='cubic')
checkdata = checkfunc(znew)
fig, ax = plt.subplots()
ax.plot(testdata[randx, randy], z[randx, randy], 'x', label='original data')
ax.plot(checkdata[randx, randy], znew[randx, randy], label='scipy')
ax.plot(ynew[randx, randy], znew[randx, randy], '--', label='Peter')
ax.legend()
plt.show()
遵循 numpy.interp 的标准,可以将 left/right 边界分配给范围外的点,在 within_bounds = ...
out_lbound = (xi <= _x[0,...])
out_rbound = (_x[-1,...] <= xi)
和
newy[i, out_lbound] = _y[0, out_lbound]
newy[i, out_rbound] = _y[-1, out_rbound]
在 newy[i, ...] = ...
之后。
如果我很好地理解@Peter9192 使用的策略,我认为更改是在同一行中。我检查了一下,但可能有些奇怪的情况无法正常工作。