如何加速 scipy.map_coordinates 多次插值?
How to accelerate scipy.map_coordinates for multiple interpolations?
我有几个值 f、g、h,它们是在同一个规则网格 (x、y、z) 上定义的,我想将它们插入到新网格 (x1、y1、z1) 中。即,我有 f(x, y, z) 、 g(x, y, z) 、 h(x, y, z) 并且我想计算 f(x1, y1, z1), g(x1, y1, z1), h(x1, y1, z1).
我目前正在使用 scipy.map_coordinates
。但是每次插值都是单独做的,点数在4,000,000左右,所以比较慢
from scipy.ndimage import map_coordinates
import numpy as np
# examples of f, g, h
f=np.random.randn(100,50,50)
g=np.random.randn(100,50,50)
h=np.random.randn(100,50,50)
# examples of x1, y1, z1
x1=np.random.rand(4000000)*100
y1=np.random.rand(4000000)*50
z1=np.random.rand(4000000)*50
# my solution at the moment
coords=np.array([x1,y1,z1])
out = np.zeros((3, coords.shape[1]))
out[0]= map_coordinates(f, coords, order=1)
out[1]= map_coordinates(g, coords, order=1)
out[2]= map_coordinates(h, coords, order=1)
有没有办法加快计算速度?
我尝试了一下,但不幸的是,它没有击败 scipy map_coordinates
功能。在我的笔记本电脑上,对 map_coordinates
的三个调用总共花费大约 1.0 秒,即每个数组每个坐标元组需要 80 纳秒。有 300 个时钟周期 (3.7 GHz CPU),这听起来很多,但事实证明还有很多工作要做。
部分工作是将浮点坐标拆分为整数部分和小数部分。这部分作业只需对三个输入数组 f、g 和 h 执行一次。不幸的是,这只需要大约 100 毫秒。只是有很多乘法和加法要做。
我使用 numba JIT 编译代码实现它,并注意内存中的数组布局,以便缓存访问相当高效,但它仍然 运行 比 scipy.ndimage.map_coordinates
慢 1.3 倍。 (编辑:max9111 在单独的答案中提供了显着的改进。)
我更改了您的坐标初始化以确保不需要越界处理:
n = 4000_000
x1=np.random.rand(n)*99
y1=np.random.rand(n)*49
z1=np.random.rand(n)*49
实施:
from numba import njit
@njit(fastmath=True)
def mymap(ars, coords):
"""ars is input arrays, shape (m, nx, ny, nz)
coords is coordinate array, float, shape (3, n)
"""
# these have shape (n, 3)
ijk = coords.T.astype(np.int16).copy() # copy for memory layout
fijk = (coords.T - ijk).astype(np.float32)
n = ijk.shape[0]
m = ars.shape[0]
out = np.empty((n, m), dtype=np.float64)
for l in range(n):
i0, j0, k0 = ijk[l, :3]
# Note: don't write i1, j1, k1 = ijk[l, :3]+1 -- much slower.
i1, j1, k1 = i0+1, j0+1, k0+1
fi1, fj1, fk1 = fijk[l, :3]
fi0, fj0, fk0 = 1-fi1, 1-fj1, 1-fk1
out[l, :] = (
fi0 * fj0 * fk0 * ars[:, i0, j0, k0] +
fi0 * fj0 * fk1 * ars[:, i0, j0, k1] +
fi0 * fj1 * fk0 * ars[:, i0, j1, k0] +
fi0 * fj1 * fk1 * ars[:, i0, j1, k1] +
fi1 * fj0 * fk0 * ars[:, i1, j0, k0] +
fi1 * fj0 * fk1 * ars[:, i1, j0, k1] +
fi1 * fj1 * fk0 * ars[:, i1, j1, k0] +
fi1 * fj1 * fk1 * ars[:, i1, j1, k1]
)
return out.T
fgh = np.array([f, g, h]).T.copy().T # optimize memory layout
out = mymap(fgh, coords)
每个坐标元组和每个输入数组,有 24 次浮点乘法和 7 次浮点加法。此外,还有一堆需要整数乘法的数组索引。输入数组之间共享的算术量相当小。
这只是对@Han-Kwang Nienhuys 回答的简短评论。
这里主要要改进的是避免矢量化命令,这会导致相当高的性能下降。
如果您使用默认的 C 序数组,通常最好更改输入和输出的数组形状 (n,3) 而不是 (3,n)。
输入
import numpy as np
import numba as nb
from scipy.ndimage import map_coordinates
# examples of f, g, h
f=np.random.randn(100,50,50)
g=np.random.randn(100,50,50)
h=np.random.randn(100,50,50)
n=4_000_000
# examples of x1, y1, z1
x1=np.random.rand(n)*99
y1=np.random.rand(n)*49
z1=np.random.rand(n)*49
coords=np.array((x1,y1,z1))
fgh = np.array([f, g, h]).T.copy().T # optimize memory layout
代码
#from Han-Kwang Nienhuys
@nb.njit(fastmath=True)
def mymap(ars, coords):
"""ars is input arrays, shape (m, nx, ny, nz)
coords is coordinate array, float, shape (3, n)
"""
# these have shape (n, 3)
ijk = coords.T.astype(np.int16)
fijk = (coords.T - ijk).astype(np.float32)
n = ijk.shape[0]
m = ars.shape[0]
out = np.empty((n, m), dtype=np.float64)
for l in range(n):
i0, j0, k0 = ijk[l, :3]
# Note: don't write i1, j1, k1 = ijk[l, :3]+1 -- much slower.
i1, j1, k1 = i0+1, j0+1, k0+1
fi1, fj1, fk1 = fijk[l, :3]
fi0, fj0, fk0 = 1-fi1, 1-fj1, 1-fk1
out[l, :] = (
fi0 * fj0 * fk0 * ars[:, i0, j0, k0] +
fi0 * fj0 * fk1 * ars[:, i0, j0, k1] +
fi0 * fj1 * fk0 * ars[:, i0, j1, k0] +
fi0 * fj1 * fk1 * ars[:, i0, j1, k1] +
fi1 * fj0 * fk0 * ars[:, i1, j0, k0] +
fi1 * fj0 * fk1 * ars[:, i1, j0, k1] +
fi1 * fj1 * fk0 * ars[:, i1, j1, k0] +
fi1 * fj1 * fk1 * ars[:, i1, j1, k1]
)
return out.T
#optimized version
@nb.njit(fastmath=True,parallel=False)
def mymap_opt(ars, coords):
"""ars is input arrays, shape (m, nx, ny, nz)
coords is coordinate array, float, shape (3, n)
"""
# these have shape (n, 3)
ijk = coords.T.astype(np.int16)
fijk = (coords.T - ijk).astype(np.float32)
n = ijk.shape[0]
m = ars.shape[0]
out = np.empty((n, m), dtype=np.float64)
for l in nb.prange(n):
i0= ijk[l, 0]
j0= ijk[l, 1]
k0 =ijk[l, 2]
# Note: don't write i1, j1, k1 = ijk[l, :3]+1 -- much slower.
i1, j1, k1 = i0+1, j0+1, k0+1
fi1= fijk[l, 0]
fj1= fijk[l, 1]
fk1 = fijk[l, 2]
fi0, fj0, fk0 = 1-fi1, 1-fj1, 1-fk1
for i in range(ars.shape[0]):
out[l, i] = (
fi0 * fj0 * fk0 * ars[i, i0, j0, k0] +
fi0 * fj0 * fk1 * ars[i, i0, j0, k1] +
fi0 * fj1 * fk0 * ars[i, i0, j1, k0] +
fi0 * fj1 * fk1 * ars[i, i0, j1, k1] +
fi1 * fj0 * fk0 * ars[i, i1, j0, k0] +
fi1 * fj0 * fk1 * ars[i, i1, j0, k1] +
fi1 * fj1 * fk0 * ars[i, i1, j1, k0] +
fi1 * fj1 * fk1 * ars[i, i1, j1, k1]
)
return out.T
时间
out_1 = mymap(fgh, coords)
out_2 = mymap_opt(fgh, coords)
print(np.allclose(out_1,out_2))
#True
%timeit out = mymap(fgh, coords)
#1.09 s ± 13.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit out = mymap_opt(fgh, coords)
#parallel=True
#144 ms ± 5.15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#parallel=False
#259 ms ± 4.76 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
我有几个值 f、g、h,它们是在同一个规则网格 (x、y、z) 上定义的,我想将它们插入到新网格 (x1、y1、z1) 中。即,我有 f(x, y, z) 、 g(x, y, z) 、 h(x, y, z) 并且我想计算 f(x1, y1, z1), g(x1, y1, z1), h(x1, y1, z1).
我目前正在使用 scipy.map_coordinates
。但是每次插值都是单独做的,点数在4,000,000左右,所以比较慢
from scipy.ndimage import map_coordinates
import numpy as np
# examples of f, g, h
f=np.random.randn(100,50,50)
g=np.random.randn(100,50,50)
h=np.random.randn(100,50,50)
# examples of x1, y1, z1
x1=np.random.rand(4000000)*100
y1=np.random.rand(4000000)*50
z1=np.random.rand(4000000)*50
# my solution at the moment
coords=np.array([x1,y1,z1])
out = np.zeros((3, coords.shape[1]))
out[0]= map_coordinates(f, coords, order=1)
out[1]= map_coordinates(g, coords, order=1)
out[2]= map_coordinates(h, coords, order=1)
有没有办法加快计算速度?
我尝试了一下,但不幸的是,它没有击败 scipy map_coordinates
功能。在我的笔记本电脑上,对 map_coordinates
的三个调用总共花费大约 1.0 秒,即每个数组每个坐标元组需要 80 纳秒。有 300 个时钟周期 (3.7 GHz CPU),这听起来很多,但事实证明还有很多工作要做。
部分工作是将浮点坐标拆分为整数部分和小数部分。这部分作业只需对三个输入数组 f、g 和 h 执行一次。不幸的是,这只需要大约 100 毫秒。只是有很多乘法和加法要做。
我使用 numba JIT 编译代码实现它,并注意内存中的数组布局,以便缓存访问相当高效,但它仍然 运行 比 scipy.ndimage.map_coordinates
慢 1.3 倍。 (编辑:max9111 在单独的答案中提供了显着的改进。)
我更改了您的坐标初始化以确保不需要越界处理:
n = 4000_000
x1=np.random.rand(n)*99
y1=np.random.rand(n)*49
z1=np.random.rand(n)*49
实施:
from numba import njit
@njit(fastmath=True)
def mymap(ars, coords):
"""ars is input arrays, shape (m, nx, ny, nz)
coords is coordinate array, float, shape (3, n)
"""
# these have shape (n, 3)
ijk = coords.T.astype(np.int16).copy() # copy for memory layout
fijk = (coords.T - ijk).astype(np.float32)
n = ijk.shape[0]
m = ars.shape[0]
out = np.empty((n, m), dtype=np.float64)
for l in range(n):
i0, j0, k0 = ijk[l, :3]
# Note: don't write i1, j1, k1 = ijk[l, :3]+1 -- much slower.
i1, j1, k1 = i0+1, j0+1, k0+1
fi1, fj1, fk1 = fijk[l, :3]
fi0, fj0, fk0 = 1-fi1, 1-fj1, 1-fk1
out[l, :] = (
fi0 * fj0 * fk0 * ars[:, i0, j0, k0] +
fi0 * fj0 * fk1 * ars[:, i0, j0, k1] +
fi0 * fj1 * fk0 * ars[:, i0, j1, k0] +
fi0 * fj1 * fk1 * ars[:, i0, j1, k1] +
fi1 * fj0 * fk0 * ars[:, i1, j0, k0] +
fi1 * fj0 * fk1 * ars[:, i1, j0, k1] +
fi1 * fj1 * fk0 * ars[:, i1, j1, k0] +
fi1 * fj1 * fk1 * ars[:, i1, j1, k1]
)
return out.T
fgh = np.array([f, g, h]).T.copy().T # optimize memory layout
out = mymap(fgh, coords)
每个坐标元组和每个输入数组,有 24 次浮点乘法和 7 次浮点加法。此外,还有一堆需要整数乘法的数组索引。输入数组之间共享的算术量相当小。
这只是对@Han-Kwang Nienhuys 回答的简短评论。 这里主要要改进的是避免矢量化命令,这会导致相当高的性能下降。
如果您使用默认的 C 序数组,通常最好更改输入和输出的数组形状 (n,3) 而不是 (3,n)。
输入
import numpy as np
import numba as nb
from scipy.ndimage import map_coordinates
# examples of f, g, h
f=np.random.randn(100,50,50)
g=np.random.randn(100,50,50)
h=np.random.randn(100,50,50)
n=4_000_000
# examples of x1, y1, z1
x1=np.random.rand(n)*99
y1=np.random.rand(n)*49
z1=np.random.rand(n)*49
coords=np.array((x1,y1,z1))
fgh = np.array([f, g, h]).T.copy().T # optimize memory layout
代码
#from Han-Kwang Nienhuys
@nb.njit(fastmath=True)
def mymap(ars, coords):
"""ars is input arrays, shape (m, nx, ny, nz)
coords is coordinate array, float, shape (3, n)
"""
# these have shape (n, 3)
ijk = coords.T.astype(np.int16)
fijk = (coords.T - ijk).astype(np.float32)
n = ijk.shape[0]
m = ars.shape[0]
out = np.empty((n, m), dtype=np.float64)
for l in range(n):
i0, j0, k0 = ijk[l, :3]
# Note: don't write i1, j1, k1 = ijk[l, :3]+1 -- much slower.
i1, j1, k1 = i0+1, j0+1, k0+1
fi1, fj1, fk1 = fijk[l, :3]
fi0, fj0, fk0 = 1-fi1, 1-fj1, 1-fk1
out[l, :] = (
fi0 * fj0 * fk0 * ars[:, i0, j0, k0] +
fi0 * fj0 * fk1 * ars[:, i0, j0, k1] +
fi0 * fj1 * fk0 * ars[:, i0, j1, k0] +
fi0 * fj1 * fk1 * ars[:, i0, j1, k1] +
fi1 * fj0 * fk0 * ars[:, i1, j0, k0] +
fi1 * fj0 * fk1 * ars[:, i1, j0, k1] +
fi1 * fj1 * fk0 * ars[:, i1, j1, k0] +
fi1 * fj1 * fk1 * ars[:, i1, j1, k1]
)
return out.T
#optimized version
@nb.njit(fastmath=True,parallel=False)
def mymap_opt(ars, coords):
"""ars is input arrays, shape (m, nx, ny, nz)
coords is coordinate array, float, shape (3, n)
"""
# these have shape (n, 3)
ijk = coords.T.astype(np.int16)
fijk = (coords.T - ijk).astype(np.float32)
n = ijk.shape[0]
m = ars.shape[0]
out = np.empty((n, m), dtype=np.float64)
for l in nb.prange(n):
i0= ijk[l, 0]
j0= ijk[l, 1]
k0 =ijk[l, 2]
# Note: don't write i1, j1, k1 = ijk[l, :3]+1 -- much slower.
i1, j1, k1 = i0+1, j0+1, k0+1
fi1= fijk[l, 0]
fj1= fijk[l, 1]
fk1 = fijk[l, 2]
fi0, fj0, fk0 = 1-fi1, 1-fj1, 1-fk1
for i in range(ars.shape[0]):
out[l, i] = (
fi0 * fj0 * fk0 * ars[i, i0, j0, k0] +
fi0 * fj0 * fk1 * ars[i, i0, j0, k1] +
fi0 * fj1 * fk0 * ars[i, i0, j1, k0] +
fi0 * fj1 * fk1 * ars[i, i0, j1, k1] +
fi1 * fj0 * fk0 * ars[i, i1, j0, k0] +
fi1 * fj0 * fk1 * ars[i, i1, j0, k1] +
fi1 * fj1 * fk0 * ars[i, i1, j1, k0] +
fi1 * fj1 * fk1 * ars[i, i1, j1, k1]
)
return out.T
时间
out_1 = mymap(fgh, coords)
out_2 = mymap_opt(fgh, coords)
print(np.allclose(out_1,out_2))
#True
%timeit out = mymap(fgh, coords)
#1.09 s ± 13.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit out = mymap_opt(fgh, coords)
#parallel=True
#144 ms ± 5.15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#parallel=False
#259 ms ± 4.76 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)