如何广播或向量化使用 scipy.ndimage map_coordinates 的二维数组的线性插值?
How to broadcast or vectorize a linear interpolation of a 2D array that uses scipy.ndimage map_coordinates?
我最近在性能方面遇到了障碍。我知道如何手动循环并通过 brute-forcing/looping 二维数组中的每一行和每一列进行从原始单元格到所有其他单元格的插值。
然而,当我处理形状为 (3000, 3000) 的二维数组时,线性间距和插值会停滞不前并严重影响性能。
我正在寻找优化此循环的方法,我知道矢量化和广播,只是不确定如何在这种情况下应用它。
我会用代码和数字来解释
import numpy as np
from scipy.ndimage import map_coordinates
m = np.array([
[10,10,10,10,10,10],
[9,9,9,10,9,9],
[9,8,9,10,8,9],
[9,7,8,0,8,9],
[8,7,7,8,8,9],
[5,6,7,7,6,7]])
origin_row = 3
origin_col = 3
m_max = np.zeros(m.shape)
m_dist = np.zeros(m.shape)
rows, cols = m.shape
for col in range(cols):
for row in range(rows):
# Get spacing linear interpolation
x_plot = np.linspace(col, origin_col, 5)
y_plot = np.linspace(row, origin_row, 5)
# grab the interpolated line
interpolated_line = map_coordinates(m,
np.vstack((y_plot,
x_plot)),
order=1, mode='nearest')
m_max[row][col] = max(interpolated_line)
m_dist[row][col] = np.argmax(interpolated_line)
print(m)
print(m_max)
print(m_dist)
如您所见,这是非常暴力的,我已经设法广播了这部分的所有代码,但卡在了这部分。
这是我要实现的目标的说明,我将进行第一次迭代
1.) 输入数组
2.) 第一个循环从0,0到原点(3,3)
3.) 这将 return [10 9 9 8 0] 并且最大值将为 10 并且索引将为 0
5.) 这是我使用的示例数组的输出
这是根据已接受的答案更新的性能。
这是一种矢量化方法。它不是很优化,可能会有一两个索引偏移错误,但它可能会给你一些想法。
两个示例,一个单色 384x512 测试图案和一个 "real" 3 通道 768x1024 图像。两者都是 uint8。
这在我的机器上需要半分钟。
对于较大的图像,需要比我拥有的 (8GB) 更多的 RAM。或者必须将其分解成更小的块。
和代码
import numpy as np
def rays(img, ctr):
M, N, *d = img.shape
aidx = 2*(slice(None),) + (img.ndim-2)*(None,)
m, n = ctr
out = np.empty_like(img)
offsI = np.empty(img.shape, np.uint16)
offsJ = np.empty(img.shape, np.uint16)
img4, out4, I4, J4 = ((x[m:, n:], x[m:, n::-1], x[m::-1, n:], x[m::-1, n::-1]) for x in (img, out, offsI, offsJ))
for i, o, y, x in zip(img4, out4, I4, J4):
for _ in range(2):
M, N, *d = i.shape
widths = np.arange(1, M+1, dtype=np.uint16).clip(None, N)
I = np.arange(M, dtype=np.uint16).repeat(widths)
J = np.ones_like(I)
J[0] = 0
J[widths[:-1].cumsum()] -= widths[:-1]
J = J.cumsum(dtype=np.uint16)
ii = np.arange(1, 2*M-1, dtype=np.uint16) // 2
II = ii.clip(None, I[:, None])
jj = np.arange(2*M-2, dtype=np.uint32) // 2 * 2 + 1
jj[0] = 0
JJ = ((1 + jj) * J[:, None] // (2*(I+1))[:, None]).astype(np.uint16).clip(None, J[:, None])
idx = i[II, JJ].argmax(axis=1)
II, JJ = (np.take_along_axis(ZZ[aidx] , idx[:, None], 1)[:, 0] for ZZ in (II, JJ))
y[I, J], x[I, J] = II, JJ
SH = II, JJ, *np.ogrid[tuple(map(slice, img.shape))][2:]
o[I, J] = i[SH]
i, o = i.swapaxes(0, 1), o.swapaxes(0, 1)
y, x = x.swapaxes(0, 1), y.swapaxes(0, 1)
return out, offsI, offsJ
from scipy.misc import face
f = face()
fr, *fidx = rays(f, (200, 400))
s = np.uint8((np.arange(384)[:, None] % 41 < 2)&(np.arange(512) % 41 < 2))
s = 255*s + 128*s[::-1, ::-1] + 64*s[::-1] + 32*s[:, ::-1]
sr, *sidx = rays(s, (200, 400))
import Image
Image.fromarray(f).show()
Image.fromarray(fr).show()
Image.fromarray(s).show()
Image.fromarray(sr).show()
为了加快代码速度,您可以先在循环外创建 x_plot
和 y_plot
,而不是每次都创建几次:
#this would be outside of the loops
num = 5
lin_col = np.array([np.linspace(i, origin_col, num) for i in range(cols)])
lin_row = np.array([np.linspace(i, origin_row, num) for i in range(rows)])
然后您可以在每个循环中通过 x_plot = lin_col[col]
和 y_plot = lin_row[row]
访问它们
其次,您可以通过对每对夫妇(row
、col
)的多个 v_stack
使用 map_coordinates
来避免这两个循环。为此,您可以使用 np.tile
and np.ravel
创建 x_plot
和 y_plot
的所有组合,例如:
arr_vs = np.vstack(( np.tile( lin_row, cols).ravel(),
np.tile( lin_col.ravel(), rows)))
注意ravel
并不是每次都用在同一个地方才能得到所有的组合。现在,您可以将 map_coordinates
与此 arr_vs
和 reshape
结合使用 rows
、cols
和 num
的结果来获得每个 interpolated_line
在 3D 数组的最后一个轴中:
arr_map = map_coordinates(m, arr_vs, order=1, mode='nearest').reshape(rows,cols,num)
最后可以在arr_map
的最后一个轴上使用np.max
和np.argmax
得到结果m_max
和m_dist
。所以所有的代码都是:
import numpy as np
from scipy.ndimage import map_coordinates
m = np.array([
[10,10,10,10,10,10],
[9,9,9,10,9,9],
[9,8,9,10,8,9],
[9,7,8,0,8,9],
[8,7,7,8,8,9],
[5,6,7,7,6,7]])
origin_row = 3
origin_col = 3
rows, cols = m.shape
num = 5
lin_col = np.array([np.linspace(i, origin_col, num) for i in range(cols)])
lin_row = np.array([np.linspace(i, origin_row, num) for i in range(rows)])
arr_vs = np.vstack(( np.tile( lin_row, cols).ravel(),
np.tile( lin_col.ravel(), rows)))
arr_map = map_coordinates(m, arr_vs, order=1, mode='nearest').reshape(rows,cols,num)
m_max = np.max( arr_map, axis=-1)
m_dist = np.argmax( arr_map, axis=-1)
print (m_max)
print (m_dist)
你得到了预期的结果:
#m_max
array([[10, 10, 10, 10, 10, 10],
[ 9, 9, 10, 10, 9, 9],
[ 9, 9, 9, 10, 8, 9],
[ 9, 8, 8, 0, 8, 9],
[ 8, 8, 7, 8, 8, 9],
[ 7, 7, 8, 8, 8, 8]])
#m_dist
array([[0, 0, 0, 0, 0, 0],
[0, 0, 2, 0, 0, 0],
[0, 2, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 2, 0, 0, 0, 0],
[1, 1, 2, 1, 2, 1]])
编辑:lin_col
和 lin_row
相关,所以你可以做得更快:
if cols >= rows:
arr = np.arange(cols)[:,None]
lin_col = arr + (origin_col-arr)/(num-1.)*np.arange(num)
lin_row = lin_col[:rows] + np.linspace(0, origin_row - origin_col, num)[None,:]
else:
arr = np.arange(rows)[:,None]
lin_row = arr + (origin_row-arr)/(num-1.)*np.arange(num)
lin_col = lin_row[:cols] + np.linspace(0, origin_col - origin_row, num)[None,:]
我最近在性能方面遇到了障碍。我知道如何手动循环并通过 brute-forcing/looping 二维数组中的每一行和每一列进行从原始单元格到所有其他单元格的插值。
然而,当我处理形状为 (3000, 3000) 的二维数组时,线性间距和插值会停滞不前并严重影响性能。
我正在寻找优化此循环的方法,我知道矢量化和广播,只是不确定如何在这种情况下应用它。
我会用代码和数字来解释
import numpy as np
from scipy.ndimage import map_coordinates
m = np.array([
[10,10,10,10,10,10],
[9,9,9,10,9,9],
[9,8,9,10,8,9],
[9,7,8,0,8,9],
[8,7,7,8,8,9],
[5,6,7,7,6,7]])
origin_row = 3
origin_col = 3
m_max = np.zeros(m.shape)
m_dist = np.zeros(m.shape)
rows, cols = m.shape
for col in range(cols):
for row in range(rows):
# Get spacing linear interpolation
x_plot = np.linspace(col, origin_col, 5)
y_plot = np.linspace(row, origin_row, 5)
# grab the interpolated line
interpolated_line = map_coordinates(m,
np.vstack((y_plot,
x_plot)),
order=1, mode='nearest')
m_max[row][col] = max(interpolated_line)
m_dist[row][col] = np.argmax(interpolated_line)
print(m)
print(m_max)
print(m_dist)
如您所见,这是非常暴力的,我已经设法广播了这部分的所有代码,但卡在了这部分。 这是我要实现的目标的说明,我将进行第一次迭代
1.) 输入数组
2.) 第一个循环从0,0到原点(3,3)
3.) 这将 return [10 9 9 8 0] 并且最大值将为 10 并且索引将为 0
5.) 这是我使用的示例数组的输出
这是根据已接受的答案更新的性能。
这是一种矢量化方法。它不是很优化,可能会有一两个索引偏移错误,但它可能会给你一些想法。
两个示例,一个单色 384x512 测试图案和一个 "real" 3 通道 768x1024 图像。两者都是 uint8。 这在我的机器上需要半分钟。
对于较大的图像,需要比我拥有的 (8GB) 更多的 RAM。或者必须将其分解成更小的块。
和代码
import numpy as np
def rays(img, ctr):
M, N, *d = img.shape
aidx = 2*(slice(None),) + (img.ndim-2)*(None,)
m, n = ctr
out = np.empty_like(img)
offsI = np.empty(img.shape, np.uint16)
offsJ = np.empty(img.shape, np.uint16)
img4, out4, I4, J4 = ((x[m:, n:], x[m:, n::-1], x[m::-1, n:], x[m::-1, n::-1]) for x in (img, out, offsI, offsJ))
for i, o, y, x in zip(img4, out4, I4, J4):
for _ in range(2):
M, N, *d = i.shape
widths = np.arange(1, M+1, dtype=np.uint16).clip(None, N)
I = np.arange(M, dtype=np.uint16).repeat(widths)
J = np.ones_like(I)
J[0] = 0
J[widths[:-1].cumsum()] -= widths[:-1]
J = J.cumsum(dtype=np.uint16)
ii = np.arange(1, 2*M-1, dtype=np.uint16) // 2
II = ii.clip(None, I[:, None])
jj = np.arange(2*M-2, dtype=np.uint32) // 2 * 2 + 1
jj[0] = 0
JJ = ((1 + jj) * J[:, None] // (2*(I+1))[:, None]).astype(np.uint16).clip(None, J[:, None])
idx = i[II, JJ].argmax(axis=1)
II, JJ = (np.take_along_axis(ZZ[aidx] , idx[:, None], 1)[:, 0] for ZZ in (II, JJ))
y[I, J], x[I, J] = II, JJ
SH = II, JJ, *np.ogrid[tuple(map(slice, img.shape))][2:]
o[I, J] = i[SH]
i, o = i.swapaxes(0, 1), o.swapaxes(0, 1)
y, x = x.swapaxes(0, 1), y.swapaxes(0, 1)
return out, offsI, offsJ
from scipy.misc import face
f = face()
fr, *fidx = rays(f, (200, 400))
s = np.uint8((np.arange(384)[:, None] % 41 < 2)&(np.arange(512) % 41 < 2))
s = 255*s + 128*s[::-1, ::-1] + 64*s[::-1] + 32*s[:, ::-1]
sr, *sidx = rays(s, (200, 400))
import Image
Image.fromarray(f).show()
Image.fromarray(fr).show()
Image.fromarray(s).show()
Image.fromarray(sr).show()
为了加快代码速度,您可以先在循环外创建 x_plot
和 y_plot
,而不是每次都创建几次:
#this would be outside of the loops
num = 5
lin_col = np.array([np.linspace(i, origin_col, num) for i in range(cols)])
lin_row = np.array([np.linspace(i, origin_row, num) for i in range(rows)])
然后您可以在每个循环中通过 x_plot = lin_col[col]
和 y_plot = lin_row[row]
其次,您可以通过对每对夫妇(row
、col
)的多个 v_stack
使用 map_coordinates
来避免这两个循环。为此,您可以使用 np.tile
and np.ravel
创建 x_plot
和 y_plot
的所有组合,例如:
arr_vs = np.vstack(( np.tile( lin_row, cols).ravel(),
np.tile( lin_col.ravel(), rows)))
注意ravel
并不是每次都用在同一个地方才能得到所有的组合。现在,您可以将 map_coordinates
与此 arr_vs
和 reshape
结合使用 rows
、cols
和 num
的结果来获得每个 interpolated_line
在 3D 数组的最后一个轴中:
arr_map = map_coordinates(m, arr_vs, order=1, mode='nearest').reshape(rows,cols,num)
最后可以在arr_map
的最后一个轴上使用np.max
和np.argmax
得到结果m_max
和m_dist
。所以所有的代码都是:
import numpy as np
from scipy.ndimage import map_coordinates
m = np.array([
[10,10,10,10,10,10],
[9,9,9,10,9,9],
[9,8,9,10,8,9],
[9,7,8,0,8,9],
[8,7,7,8,8,9],
[5,6,7,7,6,7]])
origin_row = 3
origin_col = 3
rows, cols = m.shape
num = 5
lin_col = np.array([np.linspace(i, origin_col, num) for i in range(cols)])
lin_row = np.array([np.linspace(i, origin_row, num) for i in range(rows)])
arr_vs = np.vstack(( np.tile( lin_row, cols).ravel(),
np.tile( lin_col.ravel(), rows)))
arr_map = map_coordinates(m, arr_vs, order=1, mode='nearest').reshape(rows,cols,num)
m_max = np.max( arr_map, axis=-1)
m_dist = np.argmax( arr_map, axis=-1)
print (m_max)
print (m_dist)
你得到了预期的结果:
#m_max
array([[10, 10, 10, 10, 10, 10],
[ 9, 9, 10, 10, 9, 9],
[ 9, 9, 9, 10, 8, 9],
[ 9, 8, 8, 0, 8, 9],
[ 8, 8, 7, 8, 8, 9],
[ 7, 7, 8, 8, 8, 8]])
#m_dist
array([[0, 0, 0, 0, 0, 0],
[0, 0, 2, 0, 0, 0],
[0, 2, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 2, 0, 0, 0, 0],
[1, 1, 2, 1, 2, 1]])
编辑:lin_col
和 lin_row
相关,所以你可以做得更快:
if cols >= rows:
arr = np.arange(cols)[:,None]
lin_col = arr + (origin_col-arr)/(num-1.)*np.arange(num)
lin_row = lin_col[:rows] + np.linspace(0, origin_row - origin_col, num)[None,:]
else:
arr = np.arange(rows)[:,None]
lin_row = arr + (origin_row-arr)/(num-1.)*np.arange(num)
lin_col = lin_row[:cols] + np.linspace(0, origin_col - origin_row, num)[None,:]