numpy 数组的快速条件重叠窗口(框架)

Fast conditional overlapping windowing (framing) of numpy array

我有一个巨大的 numpy 数组列表(一维),它们是不同事件的时间序列。每个点都有一个标签,我想 window 基于其标签的 numpy 数组。我的标签是 0、1 和 2。每个 window 都有固定大小 M.

每个 window 的标签将是 window 中可用的最大标签。因此,如果 window 由 0 和 1 标记的数据点组成,则整个 window.

的标签将为 1

但问题是,windowing 不是标签不可知论者。由于 class 不平衡,我只想在标签 1 和 2 的情况下重叠 windowing。

到目前为止我已经写了这段代码:

# conditional framing
data = []
start_cursor = 0
while start_cursor < arr.size:
  end_cursor = start_cursor + window_size
  data.append(
    {
      "frame": arr[start_cursor:end_cursor],
      "label": y[start_cursor:end_cursor].max(),
    }
  )
  start_cursor = end_cursor
  if np.any(y[start_cursor, end_cursor] != 0):
    start_cursor = start_cursor - overlap_size        

但这显然过于冗长而且效率低下,尤其是因为我将在我的大量单独数组列表上调用这个 while 循环。

编辑:更多地解释问题。假设你要 window 一个固定长度 M 的信号。如果 window 中只存在 0 个标签点,那么相邻 windows 之间不会有重叠。但如果存在标签 1 和 2,则两个信号之间将有百分比 p% 的重叠。

我认为这可以满足您的要求。用于检查的可视化效果不是很好,但它可以帮助您了解 windowing 是如何工作的。希望我理解你的问题,这就是你想要做的。只要时间序列中有 1 或 2(而不是 0),window 就会向前移动整个 window 长度的一部分(此处为 50%)。

要了解如何做到这一点,请从示例时间序列开始:

import matplotlib.pylab as plt
import numpy as np

N = 5000 # time series length

# create some sort of data set to work with
x = np.zeros(N)
# add a few 1s and 2s to the list (though really they are the same for the windowing)
y = np.random.random(N)
x[y < 0.01] = 1
x[y < 0.005] = 2

# assign a window length
M = 50 # window length
overlap = 0.5 # assume 50% overlap
M_overlap = int(M * (1-overlap))

我的方法是对您的时间序列感兴趣的 window 求和。如果总和 ==0,则 windows 之间没有重叠,如果总和 >0 则存在重叠。那么,问题就变成了我们应该如何有效地计算这些总和?我比较两种方法。第一种是简单地遍历时间序列,第二种是使用 convolution(速度要快得多)。对于第一个,我还探索了求和后评估 window 大小的不同方法。

求和(慢版)

def window_sum1():
    # start of windows in list windows
    windows = [0,]
    while windows[-1] + M < N:
        check = sum(x[windows[-1]:windows[-1]+M]) == 0
        windows.append(windows[-1] + M_overlap + (M - M_overlap) * check)
        if windows[-1] + M > N:
            windows.pop()
            break
    # plotting stuff for checking
    return(windows)
Niter = 10**4
print(timeit.timeit(window_sum1, number = Niter))
# 29.201083058

所以这种方法在大约 30 秒内处理了 10,000 个长度为 5000 的时间序列。但是windows.append(windows[-1] + M_overlap + (M - M_overlap) * check)行可以在if语句中精简。

求和(快版,比慢版快33%)

def window_sum2():
    # start of windows in list windows
    windows = [0,]
    while windows[-1] + M < N:
        check = sum(x[windows[-1]:windows[-1]+M]) == 0
        if check:
            windows.append(windows[-1] + M)
        else:
            windows.append(windows[-1] + M_overlap)
        if windows[-1] + M > N:
            windows.pop()
            break
    # plotting stuff for checking
    return(windows)
print(timeit.timeit(window_sum2, number = Niter))
# 20.456240447000003

我们发现 if 语句的时间减少了 1/3。

卷积(比快速求和快85%)

我们可以使用信号处理来获得更快的速度,方法是使用 numpy.convolve. (Disclaimer: I got the idea from the accepted answer to 将时间序列与感兴趣的 window 进行卷积。)当然,采用更快的 window 上面的尺寸评估。

def window_conv():
    a = np.convolve(x,np.ones(M,dtype=int),'valid')
    windows = [0,]
    while windows[-1] + M < N:
        if a[windows[-1]]:
            windows.append(windows[-1] + M_overlap)
        else:
            windows.append(windows[-1] + M)
        if windows[-1] + M > N:
            windows.pop()
            break
    return(windows)
print(timeit.timeit(window_conv, number = Niter))
#3.3695770570000008

滑动window

我要补充的最后一件事是,如 , as of numpy 1.20 there is a function called sliding_window_view 的评论之一所示。我还有 numpy 1.19 运行 并且无法测试它是否比卷积更快。

起初,我认为你应该将这一行 if np.any(y[start_cursor, end_cursor] != 0): 修改为 if np.any(y[start_cursor:end_cursor] != 0):

无论如何, 我认为我们可以在某些时候修改您的代码。

首先,您可以修改这部分:

 if np.any(y[start_cursor: end_cursor] != 0):
        start_cursor = start_cursor - overlap_size

在这些行之前你计算了 y[start_cursor:end_cursor].max() 所以你知道有没有大于 0 的标签。所以这是一个更好的:

  if data[-1]['label'] != 0):
            start_cursor -= overlap_size

虽然,更好的方法是将 y[start_cursor:end_cursor].max() 设置为用于设置 'label' 和检查“if 表达式”

的值

其次,您对数据使用了“追加”。效率太低了。最好的方法是分配零帧(你的帧有固定大小,你知道最大帧数是maxNumFrame=np.ceil((arr.size-overlap_size)/(window_size-overlap_size))。所以,你应该在第一步初始化frames=np.zeros((maxNumFrame,window_size)),然后你改变帧在“while”或者如果你想使用你的自定义结构,你可以用零值初始化你的列表,然后在“while”

中改变值

第三,最好的方法是稍后计算“start_cursor”和y并将它们设置到元组数组或2数组中。 (“end_cursor”是多余的) 之后,按照我所说的一种方式使用“地图”制作框架。 (在一个数组或您自定义的结构中)