numpy 数组的快速条件重叠窗口(框架)
Fast conditional overlapping windowing (framing) of numpy array
我有一个巨大的 numpy 数组列表(一维),它们是不同事件的时间序列。每个点都有一个标签,我想 window 基于其标签的 numpy 数组。我的标签是 0、1 和 2。每个 window 都有固定大小 M.
每个 window 的标签将是 window 中可用的最大标签。因此,如果 window 由 0 和 1 标记的数据点组成,则整个 window.
的标签将为 1
但问题是,windowing 不是标签不可知论者。由于 class 不平衡,我只想在标签 1 和 2 的情况下重叠 windowing。
到目前为止我已经写了这段代码:
# conditional framing
data = []
start_cursor = 0
while start_cursor < arr.size:
end_cursor = start_cursor + window_size
data.append(
{
"frame": arr[start_cursor:end_cursor],
"label": y[start_cursor:end_cursor].max(),
}
)
start_cursor = end_cursor
if np.any(y[start_cursor, end_cursor] != 0):
start_cursor = start_cursor - overlap_size
但这显然过于冗长而且效率低下,尤其是因为我将在我的大量单独数组列表上调用这个 while 循环。
编辑:更多地解释问题。假设你要 window 一个固定长度 M 的信号。如果 window 中只存在 0 个标签点,那么相邻 windows 之间不会有重叠。但如果存在标签 1 和 2,则两个信号之间将有百分比 p% 的重叠。
我认为这可以满足您的要求。用于检查的可视化效果不是很好,但它可以帮助您了解 windowing 是如何工作的。希望我理解你的问题,这就是你想要做的。只要时间序列中有 1 或 2(而不是 0),window 就会向前移动整个 window 长度的一部分(此处为 50%)。
要了解如何做到这一点,请从示例时间序列开始:
import matplotlib.pylab as plt
import numpy as np
N = 5000 # time series length
# create some sort of data set to work with
x = np.zeros(N)
# add a few 1s and 2s to the list (though really they are the same for the windowing)
y = np.random.random(N)
x[y < 0.01] = 1
x[y < 0.005] = 2
# assign a window length
M = 50 # window length
overlap = 0.5 # assume 50% overlap
M_overlap = int(M * (1-overlap))
我的方法是对您的时间序列感兴趣的 window 求和。如果总和 ==0
,则 windows 之间没有重叠,如果总和 >0
则存在重叠。那么,问题就变成了我们应该如何有效地计算这些总和?我比较两种方法。第一种是简单地遍历时间序列,第二种是使用 convolution(速度要快得多)。对于第一个,我还探索了求和后评估 window 大小的不同方法。
求和(慢版)
def window_sum1():
# start of windows in list windows
windows = [0,]
while windows[-1] + M < N:
check = sum(x[windows[-1]:windows[-1]+M]) == 0
windows.append(windows[-1] + M_overlap + (M - M_overlap) * check)
if windows[-1] + M > N:
windows.pop()
break
# plotting stuff for checking
return(windows)
Niter = 10**4
print(timeit.timeit(window_sum1, number = Niter))
# 29.201083058
所以这种方法在大约 30 秒内处理了 10,000 个长度为 5000 的时间序列。但是windows.append(windows[-1] + M_overlap + (M - M_overlap) * check)
行可以在if语句中精简。
求和(快版,比慢版快33%)
def window_sum2():
# start of windows in list windows
windows = [0,]
while windows[-1] + M < N:
check = sum(x[windows[-1]:windows[-1]+M]) == 0
if check:
windows.append(windows[-1] + M)
else:
windows.append(windows[-1] + M_overlap)
if windows[-1] + M > N:
windows.pop()
break
# plotting stuff for checking
return(windows)
print(timeit.timeit(window_sum2, number = Niter))
# 20.456240447000003
我们发现 if 语句的时间减少了 1/3。
卷积(比快速求和快85%)
我们可以使用信号处理来获得更快的速度,方法是使用 numpy.convolve. (Disclaimer: I got the idea from the accepted answer to 将时间序列与感兴趣的 window 进行卷积。)当然,采用更快的 window 上面的尺寸评估。
def window_conv():
a = np.convolve(x,np.ones(M,dtype=int),'valid')
windows = [0,]
while windows[-1] + M < N:
if a[windows[-1]]:
windows.append(windows[-1] + M_overlap)
else:
windows.append(windows[-1] + M)
if windows[-1] + M > N:
windows.pop()
break
return(windows)
print(timeit.timeit(window_conv, number = Niter))
#3.3695770570000008
滑动window
我要补充的最后一件事是,如 , as of numpy 1.20
there is a function called sliding_window_view 的评论之一所示。我还有 numpy 1.19
运行 并且无法测试它是否比卷积更快。
起初,我认为你应该将这一行 if np.any(y[start_cursor, end_cursor] != 0):
修改为 if np.any(y[start_cursor:end_cursor] != 0):
无论如何,
我认为我们可以在某些时候修改您的代码。
首先,您可以修改这部分:
if np.any(y[start_cursor: end_cursor] != 0):
start_cursor = start_cursor - overlap_size
在这些行之前你计算了 y[start_cursor:end_cursor].max()
所以你知道有没有大于 0 的标签。所以这是一个更好的:
if data[-1]['label'] != 0):
start_cursor -= overlap_size
虽然,更好的方法是将 y[start_cursor:end_cursor].max()
设置为用于设置 'label' 和检查“if 表达式”
的值
其次,您对数据使用了“追加”。效率太低了。最好的方法是分配零帧(你的帧有固定大小,你知道最大帧数是maxNumFrame=np.ceil((arr.size-overlap_size)/(window_size-overlap_size))
。所以,你应该在第一步初始化frames=np.zeros((maxNumFrame,window_size))
,然后你改变帧在“while”或者如果你想使用你的自定义结构,你可以用零值初始化你的列表,然后在“while”
中改变值
第三,最好的方法是稍后计算“start_cursor”和y并将它们设置到元组数组或2数组中。 (“end_cursor”是多余的)
之后,按照我所说的一种方式使用“地图”制作框架。 (在一个数组或您自定义的结构中)
我有一个巨大的 numpy 数组列表(一维),它们是不同事件的时间序列。每个点都有一个标签,我想 window 基于其标签的 numpy 数组。我的标签是 0、1 和 2。每个 window 都有固定大小 M.
每个 window 的标签将是 window 中可用的最大标签。因此,如果 window 由 0 和 1 标记的数据点组成,则整个 window.
的标签将为 1但问题是,windowing 不是标签不可知论者。由于 class 不平衡,我只想在标签 1 和 2 的情况下重叠 windowing。
到目前为止我已经写了这段代码:
# conditional framing
data = []
start_cursor = 0
while start_cursor < arr.size:
end_cursor = start_cursor + window_size
data.append(
{
"frame": arr[start_cursor:end_cursor],
"label": y[start_cursor:end_cursor].max(),
}
)
start_cursor = end_cursor
if np.any(y[start_cursor, end_cursor] != 0):
start_cursor = start_cursor - overlap_size
但这显然过于冗长而且效率低下,尤其是因为我将在我的大量单独数组列表上调用这个 while 循环。
编辑:更多地解释问题。假设你要 window 一个固定长度 M 的信号。如果 window 中只存在 0 个标签点,那么相邻 windows 之间不会有重叠。但如果存在标签 1 和 2,则两个信号之间将有百分比 p% 的重叠。
我认为这可以满足您的要求。用于检查的可视化效果不是很好,但它可以帮助您了解 windowing 是如何工作的。希望我理解你的问题,这就是你想要做的。只要时间序列中有 1 或 2(而不是 0),window 就会向前移动整个 window 长度的一部分(此处为 50%)。
要了解如何做到这一点,请从示例时间序列开始:
import matplotlib.pylab as plt
import numpy as np
N = 5000 # time series length
# create some sort of data set to work with
x = np.zeros(N)
# add a few 1s and 2s to the list (though really they are the same for the windowing)
y = np.random.random(N)
x[y < 0.01] = 1
x[y < 0.005] = 2
# assign a window length
M = 50 # window length
overlap = 0.5 # assume 50% overlap
M_overlap = int(M * (1-overlap))
我的方法是对您的时间序列感兴趣的 window 求和。如果总和 ==0
,则 windows 之间没有重叠,如果总和 >0
则存在重叠。那么,问题就变成了我们应该如何有效地计算这些总和?我比较两种方法。第一种是简单地遍历时间序列,第二种是使用 convolution(速度要快得多)。对于第一个,我还探索了求和后评估 window 大小的不同方法。
求和(慢版)
def window_sum1():
# start of windows in list windows
windows = [0,]
while windows[-1] + M < N:
check = sum(x[windows[-1]:windows[-1]+M]) == 0
windows.append(windows[-1] + M_overlap + (M - M_overlap) * check)
if windows[-1] + M > N:
windows.pop()
break
# plotting stuff for checking
return(windows)
Niter = 10**4
print(timeit.timeit(window_sum1, number = Niter))
# 29.201083058
所以这种方法在大约 30 秒内处理了 10,000 个长度为 5000 的时间序列。但是windows.append(windows[-1] + M_overlap + (M - M_overlap) * check)
行可以在if语句中精简。
求和(快版,比慢版快33%)
def window_sum2():
# start of windows in list windows
windows = [0,]
while windows[-1] + M < N:
check = sum(x[windows[-1]:windows[-1]+M]) == 0
if check:
windows.append(windows[-1] + M)
else:
windows.append(windows[-1] + M_overlap)
if windows[-1] + M > N:
windows.pop()
break
# plotting stuff for checking
return(windows)
print(timeit.timeit(window_sum2, number = Niter))
# 20.456240447000003
我们发现 if 语句的时间减少了 1/3。
卷积(比快速求和快85%)
我们可以使用信号处理来获得更快的速度,方法是使用 numpy.convolve. (Disclaimer: I got the idea from the accepted answer to
def window_conv():
a = np.convolve(x,np.ones(M,dtype=int),'valid')
windows = [0,]
while windows[-1] + M < N:
if a[windows[-1]]:
windows.append(windows[-1] + M_overlap)
else:
windows.append(windows[-1] + M)
if windows[-1] + M > N:
windows.pop()
break
return(windows)
print(timeit.timeit(window_conv, number = Niter))
#3.3695770570000008
滑动window
我要补充的最后一件事是,如 numpy 1.20
there is a function called sliding_window_view 的评论之一所示。我还有 numpy 1.19
运行 并且无法测试它是否比卷积更快。
起初,我认为你应该将这一行 if np.any(y[start_cursor, end_cursor] != 0):
修改为 if np.any(y[start_cursor:end_cursor] != 0):
无论如何, 我认为我们可以在某些时候修改您的代码。
首先,您可以修改这部分:
if np.any(y[start_cursor: end_cursor] != 0):
start_cursor = start_cursor - overlap_size
在这些行之前你计算了 y[start_cursor:end_cursor].max()
所以你知道有没有大于 0 的标签。所以这是一个更好的:
if data[-1]['label'] != 0):
start_cursor -= overlap_size
虽然,更好的方法是将 y[start_cursor:end_cursor].max()
设置为用于设置 'label' 和检查“if 表达式”
其次,您对数据使用了“追加”。效率太低了。最好的方法是分配零帧(你的帧有固定大小,你知道最大帧数是maxNumFrame=np.ceil((arr.size-overlap_size)/(window_size-overlap_size))
。所以,你应该在第一步初始化frames=np.zeros((maxNumFrame,window_size))
,然后你改变帧在“while”或者如果你想使用你的自定义结构,你可以用零值初始化你的列表,然后在“while”
第三,最好的方法是稍后计算“start_cursor”和y并将它们设置到元组数组或2数组中。 (“end_cursor”是多余的) 之后,按照我所说的一种方式使用“地图”制作框架。 (在一个数组或您自定义的结构中)