在对 ndarray 进行轻微更改后,Numpy ravel 花费的时间太长

Numpy ravel takes too long after a slight change to a ndarray

我正在处理一个展平图像 (1920x1080x4),我需要在其中重塑形状(例如 arr.reshape((1920,1080,4))),删除最后一个通道(例如 arr[:,:,:3]),从 BGR 转换为 RGB (例如 arr[:,:,::-1]),最后再次变平(例如 arr.ravel())。问题在于 ravel/flatten/reshape(-1) 操作,这会增加大约 20 毫秒的计算时间。

为了便于调试,我假设传入的数组是一个展平的 1920x1080x3 图像,这意味着我只需要担心 BGR 到 RGB 的转换和展平。但是,当测试 reshape+ravel、reshape+BGR2RGB 和最后的 reshape+BGR2RGB+ravel 时,结果分别为 1ms、1ms、20ms,这对我来说没有任何意义,因为它只是一些值在内存中改变位置. ravel 是否有任何理由创建数组的副本?我怎样才能减少这个时间?

注意: 我还测试了写在 numpy.reshape 文档注释中的 inplace reshape 方法,但是,正如指定的那样,出现了一个错误,这意味着需要先复制数组才能重塑。

下面是我用来测试的代码:

import numpy as np
from time import time

arr_original = np.ones((1920*1080*3), dtype=np.uint8)

arr = arr_original.copy()
s = time()
arr = arr.reshape(1920,1080,3)
arr = arr.ravel()
print(f"Reshape + ravel: {round(1000*(time()-s),2)}ms")

arr = arr_original.copy()
s = time()
arr = arr.reshape(1920,1080,3)
arr = arr[:,:,::-1]
print(f"Reshape + BGR2RGB: {round(1000*(time()-s),2)}ms")

arr = arr_original.copy()
s = time()
arr = arr.reshape(1920,1080,3)
arr = arr[:,:,::-1]
arr = arr.ravel()
print(f"Reshape + BGR2RGB + ravel: {round(1000*(time()-s),2)}ms")

在我的机器上输出

Reshape + ravel: 0.01ms
Reshape + BGR2RGB: 0.01ms
Reshape + BGR2RGB + ravel: 20.54ms

这是因为您上面的所有操作都在为相同的数据生成视图,但需要最后的理解才能制作副本。

numpy 数组中的数组具有底层内存,形状和步幅确定每个元素所在的位置。

可以通过简单地改变形状和步幅来重塑连续数组,而无需修改数据。这里的切片也是如此。但是由于你的最后一个数组不是连续的,当你使用 ravel 时它会复制所有内容。

例如在 3d 数组中访问元素 arr[i,j,k] 意味着访问 base + i * arr.strides[0] + j * arr.strides[1] + k * arr.strides[1] 处的内存你可以用这个做很多事情(如果你在给定的轴上使用步幅 0 甚至广播) .

arr_original = np.ones((1920*1080*4), dtype=np.uint8)
arr = arr_original
print(arr.shape, arr.strides)
arr = arr.reshape(1920,1080,4)
print(arr.shape, arr.strides)
arr = arr[:,:,:3] # keep strides only reduces the length of the last axis
print(arr.shape, arr.strides)
arr = arr[:,:,::-1] # change strides of last axis to -1
print(arr.shape, arr.strides)
arr[0,0,:] = [3,4,5] # operations here are using the memory allocated
arr[0,1,:] = [6,7,8] # for arr_original
arr = arr.ravel()
arr[:] = 0 # this won't affect the original because the data was copied
print(arr_original[:8])

改进您的解决方案

在这种情况下,您必须试验或深入研究库代码。我更喜欢测试不同的代码编写方式。

一般来说,原始方法是最好的方法,但在这种特定情况下,我们拥有的是未对齐的内存,因为您正在以步长 3 写入 uint8。

判断性能时,了解合理预期很重要,在这种情况下,我们可以将格式转换与纯副本进行比较

arr = arr_original.copy()

每个循环 1.89 ms ± 43.1 µs(7 次运行的平均值 ± 标准偏差,每次 100 次循环)

arr = arr_original
arr = arr.reshape(1920,1080,4)
arr = arr[:,:,:3] 
arr = arr[:,:,::-1]
arr[0,0,:] = [3,4,5] 
arr[0,1,:] = [6,7,8] 
arr = arr.ravel()

12.3 ms ± 101 µs 每个循环(7 次运行的平均值 ± 标准偏差,每次 100 次循环) (比副本慢大约 6 倍)

arr = arr_original
arr = arr.reshape(1920,1080,4)
arr_aux = np.empty(arr.shape[:-1] + (3,), dtype=np.uint8)
arr_aux[:,:,0] = arr[:,:,2]
arr_aux[:,:,1] = arr[:,:,1]
arr_aux[:,:,2] = arr[:,:,0]
arr = arr_aux.ravel()

4.16 ms ± 25 µs 每个循环(7 次运行的平均值 ± 标准偏差,每次 100 次循环) (比副本慢 2 倍)

分析

在第一种情况下,最后一个轴的尺寸也非常小,所以这可能会导致一个小循环。让我们看看如何将此操作投影到 C++

for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    // this part would be the bottleneck
    for(int k = 0; k < 3; ++k){
      dst[(width * i + j)*3 + k] = src[(width * i + j)*4 + k];
    }
  }
}

当然,numpy 做的事情远不止于此,通过将独立于循环变量的预计算部分移到循环外,可以更有效地计算索引。这里的想法是说教。

我们来统计执行的分支数,每个for循环将执行N+1个分支进行N次迭代(N次进入循环,最后一次跳转跳出循环)。所以上面的代码运行1 + height * (1 + 1 + width * (1 + 3)) ~ 4 * width * height个分支。

如果我们将最内层的循环展开为

for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    // this part would be the bottleneck
    dst[(width * i + j)*3 + 0] = src[(width * i + j)*4 + 0];
    dst[(width * i + j)*3 + 1] = src[(width * i + j)*4 + 1];
    dst[(width * i + j)*3 + 2] = src[(width * i + j)*4 + 2];
  }
}

分支数变成1 + height * (1 + 1 + width) ~ height * width,少了4倍。我们不能在 python 中执行此操作,因为我们无权访问内部循环。但是对于第二个代码,我们实现了类似

的东西
for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    // this part would be the bottleneck
    dst[(width * i + j)*3 + 0] = src[(width * i + j)*4 + 0];
  }
}

for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    dst[(width * i + j)*3 + 1] = src[(width * i + j)*4 + 1];
  }
}

for(int i = 0; i < height; ++i){
  for(int j = 0; j < width; ++j){
    dst[(width * i + j)*3 + 2] = src[(width * i + j)*4 + 2];
  }
}

那仍然比第一个分支少。

通过观察到的改进,我想最后一个循环必须调用类似 memcpy 的函数或其他具有更多开销的函数,以尝试更快地处理更大的切片,可能会检查内存对齐,但会失败,因为我们正在使用步幅为 3 的字节。