Python Numba/NumPy 中实施的分摊 O(1) 滚动最小值
Amortized O(1) rolling minimum implemented in Python Numba/NumPy
我正在尝试实施具有摊销 O(1) get_min()
的滚动最小值。摊销的 O(1) 算法来自 the accepted answer in this post
原函数:
import pandas as pd
import numpy as np
from numba import njit, prange
def rolling_min_original(data, n):
return pd.Series(data).rolling(n).min().to_numpy()
我尝试实现分摊 O(1) get_min()
算法:(此函数对非小 n
具有不错的性能)
@njit
def rollin_min(data, n):
"""
brief explanations:
param: stk2: the stack2 in the algorithm, except here it only stores the min stack
param: stk2_top: it starts at n-1, and drops gradually until it hits -1 then it comes backup to n-1
if stk2_top= 0 in the current iteration(it will become -1 at the end):
that means stk2_top is pointing at the bottom element in stk2,
after it drops to -1 from 0, in the next iteration, stk2 will be reassigned to a new array data[i-n+1:i+1],
because we need to include the current index.
at each iteration:
if stk2_top <0: (i.e. we have 0 stuff in stk2(aka stk2_top <0)
- copy the past n items(including the current one) to stk2, so that stk2 has n items now
- pick the top min from stk2(stk2_top = n-1 momentarily)
- move down the pointer by 1 after the operation(n-1 becomes n-2)
else: (i.e. we have j(1<=j<= n-1) stuff in stk2)
- pick the top min from stk2(stk2_top is j-1 momentarily)
- move down the pointer by 1 after the operation(j-1 becomes j-2)
"""
if n >1:
def min_getter_rev(arr1):
arr = arr1[::-1]
result = np.empty(len(arr), dtype = arr1.dtype)
result[0]= local_min = arr[0]
for i in range(1,len(arr)):
if arr[i] < local_min:
local_min = arr[i]
result[i] = local_min
return result
result_min= np.empty(len(data), dtype= data.dtype)
for i in prange(n-1):
result_min[i] =np.nan
stk2 = min_getter_rev(data[:n])
stk2_top = n-2#it is n-2 because the loop starts at n(not n-1)which is the second non nan term
stk1_min = data[n-1]#stk1 needs to be the first item of the stk1
result_min[n-1]= min(stk1_min, stk2[-1])
for i in range(n,len(data)):
if stk2_top >= 0:
if data[i] < stk1_min:
stk1_min= min(data[i], stk1_min)#the stk1 min
result_min[i] = min(stk1_min, stk2[stk2_top])#min of the top element in stk2 and the current element
else:
stk2 = min_getter_rev(data[i-n+1:i+1])
stk2_top= n-1
stk1_min = data[i]
result_min[i]= min(stk1_min, stk2[n-1])
stk2_top -= 1
return result_min
else:
return data
n
小时的简单实现:
@njit(parallel= True)
def rolling_min_smalln(data, n):
result= np.empty(len(data), dtype= data.dtype)
for i in prange(n-1):
result[i]= np.nan
for i in prange(n-1, len(data)):
result[i]= data[i-n+1: i+1].min()
return result
一些用于测试的小代码
def remove_nan(arr):
return arr[~np.isnan(arr)]
if __name__ == '__main__':
np.random.seed(0)
data_size = 200000
data = np.random.uniform(0,1000, size = data_size)+29000
w_size = 37
r_min_original= rolling_min_original(data, w_size)
rmin1 = rollin_min(data, w_size)
r_min_original = remove_nan(r_min_original)
rmin1 = remove_nan(rmin1)
print(np.array_equal(r_min_original,rmin1))
当n
很大时,函数rollin_min()
的运行时间几乎恒定,运行时间比rolling_min_original()
短,这很好。但是当 n
较低时(在我的电脑中大约 n < 37
,在此范围内 rollin_min()
很容易被天真的实现 rolling_min_smalln()
打败,它的性能很差。
我正在努力寻找改进的方法rollin_min()
,但到目前为止我还是卡住了,这就是我在这里寻求帮助的原因。
我的问题如下:
我正在实施的算法是 rolling/sliding window min/max 的最佳算法吗?
如果不是,best/better算法是什么?如果是这样,我如何从算法的角度进一步改进功能?
除了算法本身,还有哪些方法可以进一步提升函数的性能rollin_min()
?
编辑:根据多个请求将我的最新答案移至答案部分
您的代码运行缓慢的主要原因可能是在 min_getter_rev 中分配了一个新数组。您应该始终重复使用相同的存储空间。
然后,因为你真的不需要实现一个队列,你可以做更多的优化。例如,两个堆栈的大小最多(通常)为 n,因此您可以将它们保存在大小为 n 的同一个数组中。从头种一个,从尾种一个。
您会注意到有一个非常规则的模式 - 按顺序从头到尾填充数组,从末尾重新计算最小值,在重新填充数组时生成输出,重复...
这导致了一个实际上更简单的算法,其解释更简单,根本不涉及堆栈。这是一个实现,其中包含有关其工作原理的评论。请注意,我没有费心用 NaN 填充开头:
def rollin_min(data, n):
#allocate the result. Note the number valid windows is len(data)-(n-1)
result = np.empty(len(data)-(n-1), data.dtype)
#every nth position is a "mark"
#every window therefore contains exactly 1 mark
#the minimum in the window is the minimum of:
# the minimum from the window start to the following mark; and
# the minimum from the window end the the preceding (same) mark
#calculate the minimum from every window start index to the next mark
for mark in range(n-1, len(data), n):
v = data[mark]
if (mark < len(result)):
result[mark] = v
for i in range(mark-1, mark-n, -1):
v = min(data[i],v)
if (i < len(result)):
result[i] = v
#for each window, calculate the running total from the preceding mark
# to its end. The first window ends at the first mark
#then combine it with the first distance to get the window minimum
nextMarkPos = 0
for i in range(0,len(result)):
if i == nextMarkPos:
v = data[i+n-1]
nextMarkPos += n
else:
v = min(data[i+n-1],v)
result[i] = min(result[i],v)
return result
应多个请求将其从“问题编辑”部分移至此处。
受到 Matt Timmermans 在答案中给出的更简单实现的启发,我制作了一个 cpu-rolling min 的多核版本。代码如下:
@njit(parallel= True)
def rollin_min2(data, n):
"""
1) make a loop that iterates over K sections of n elements; each section is independent so that it can benefit from multicores cpu
2) for each iteration of sections, generate backward local minimum(sec_min2) and forward minimum(sec_min1)
say m=8, len(data)= 23, then we only need the idx= (reversed to 7,6,5,...,1,0 (0 means minimum up until idx= 0)),
1st iter
result[7]= min_until 0,
result[8]= min(min(data[7:9]) and min_until 1),
result[9]= min(min(data[7:10]) and m_til 2)
...
result[14]= min(min(data[7:15]) and m_til 7)
2nd iter
result[15]= min_until 8,
result[16]= min(min(data[15:17]) and m_til 9),
result[17]= min(min(data[15:18]) and m_til 10)
...
result[22]= min(min(data[15:23]) and m_til 15)
"""
ar_len= len(data)
sec_min1= np.empty(ar_len, dtype = data.dtype)
sec_min2= np.empty(ar_len, dtype = data.dtype)
for i in prange(n-1):
sec_min1[i]= np.nan
for sec in prange(ar_len//n):
s2_min= data[n*sec+ n-1]
s1_min= data[n*sec+ n]
for i in range(n-1,-1,-1):
if data[n*sec+i] < s2_min:
s2_min= data[n*sec+i]
sec_min2[n*sec+i]= s2_min
sec_min1[n*sec+ n-1]= sec_min2[n*sec]
for i in range(n-1):
if n*sec+n+i < ar_len:
if data[n*sec+n+i] < s1_min:
s1_min= data[n*sec+n+i]
sec_min1[n*sec+n+i]= min(s1_min, sec_min2[n*sec+i+1])
else:
break
return sec_min1
我实际上已经花了一个小时来测试 rolling min 的各种实现。在我的 6C/12T 笔记本电脑中,这个多核版本在 n 为 "medium size" 时效果最佳。但是,当 n 至少是源数据长度的 30% 时,其他实现开始胜出。必须有更好的方法来改进此功能,但在编辑时我还不知道它们。
我正在尝试实施具有摊销 O(1) get_min()
的滚动最小值。摊销的 O(1) 算法来自 the accepted answer in this post
原函数:
import pandas as pd
import numpy as np
from numba import njit, prange
def rolling_min_original(data, n):
return pd.Series(data).rolling(n).min().to_numpy()
我尝试实现分摊 O(1) get_min()
算法:(此函数对非小 n
具有不错的性能)
@njit
def rollin_min(data, n):
"""
brief explanations:
param: stk2: the stack2 in the algorithm, except here it only stores the min stack
param: stk2_top: it starts at n-1, and drops gradually until it hits -1 then it comes backup to n-1
if stk2_top= 0 in the current iteration(it will become -1 at the end):
that means stk2_top is pointing at the bottom element in stk2,
after it drops to -1 from 0, in the next iteration, stk2 will be reassigned to a new array data[i-n+1:i+1],
because we need to include the current index.
at each iteration:
if stk2_top <0: (i.e. we have 0 stuff in stk2(aka stk2_top <0)
- copy the past n items(including the current one) to stk2, so that stk2 has n items now
- pick the top min from stk2(stk2_top = n-1 momentarily)
- move down the pointer by 1 after the operation(n-1 becomes n-2)
else: (i.e. we have j(1<=j<= n-1) stuff in stk2)
- pick the top min from stk2(stk2_top is j-1 momentarily)
- move down the pointer by 1 after the operation(j-1 becomes j-2)
"""
if n >1:
def min_getter_rev(arr1):
arr = arr1[::-1]
result = np.empty(len(arr), dtype = arr1.dtype)
result[0]= local_min = arr[0]
for i in range(1,len(arr)):
if arr[i] < local_min:
local_min = arr[i]
result[i] = local_min
return result
result_min= np.empty(len(data), dtype= data.dtype)
for i in prange(n-1):
result_min[i] =np.nan
stk2 = min_getter_rev(data[:n])
stk2_top = n-2#it is n-2 because the loop starts at n(not n-1)which is the second non nan term
stk1_min = data[n-1]#stk1 needs to be the first item of the stk1
result_min[n-1]= min(stk1_min, stk2[-1])
for i in range(n,len(data)):
if stk2_top >= 0:
if data[i] < stk1_min:
stk1_min= min(data[i], stk1_min)#the stk1 min
result_min[i] = min(stk1_min, stk2[stk2_top])#min of the top element in stk2 and the current element
else:
stk2 = min_getter_rev(data[i-n+1:i+1])
stk2_top= n-1
stk1_min = data[i]
result_min[i]= min(stk1_min, stk2[n-1])
stk2_top -= 1
return result_min
else:
return data
n
小时的简单实现:
@njit(parallel= True)
def rolling_min_smalln(data, n):
result= np.empty(len(data), dtype= data.dtype)
for i in prange(n-1):
result[i]= np.nan
for i in prange(n-1, len(data)):
result[i]= data[i-n+1: i+1].min()
return result
一些用于测试的小代码
def remove_nan(arr):
return arr[~np.isnan(arr)]
if __name__ == '__main__':
np.random.seed(0)
data_size = 200000
data = np.random.uniform(0,1000, size = data_size)+29000
w_size = 37
r_min_original= rolling_min_original(data, w_size)
rmin1 = rollin_min(data, w_size)
r_min_original = remove_nan(r_min_original)
rmin1 = remove_nan(rmin1)
print(np.array_equal(r_min_original,rmin1))
当n
很大时,函数rollin_min()
的运行时间几乎恒定,运行时间比rolling_min_original()
短,这很好。但是当 n
较低时(在我的电脑中大约 n < 37
,在此范围内 rollin_min()
很容易被天真的实现 rolling_min_smalln()
打败,它的性能很差。
我正在努力寻找改进的方法rollin_min()
,但到目前为止我还是卡住了,这就是我在这里寻求帮助的原因。
我的问题如下:
我正在实施的算法是 rolling/sliding window min/max 的最佳算法吗?
如果不是,best/better算法是什么?如果是这样,我如何从算法的角度进一步改进功能?
除了算法本身,还有哪些方法可以进一步提升函数的性能rollin_min()
?
编辑:根据多个请求将我的最新答案移至答案部分
您的代码运行缓慢的主要原因可能是在 min_getter_rev 中分配了一个新数组。您应该始终重复使用相同的存储空间。
然后,因为你真的不需要实现一个队列,你可以做更多的优化。例如,两个堆栈的大小最多(通常)为 n,因此您可以将它们保存在大小为 n 的同一个数组中。从头种一个,从尾种一个。
您会注意到有一个非常规则的模式 - 按顺序从头到尾填充数组,从末尾重新计算最小值,在重新填充数组时生成输出,重复...
这导致了一个实际上更简单的算法,其解释更简单,根本不涉及堆栈。这是一个实现,其中包含有关其工作原理的评论。请注意,我没有费心用 NaN 填充开头:
def rollin_min(data, n):
#allocate the result. Note the number valid windows is len(data)-(n-1)
result = np.empty(len(data)-(n-1), data.dtype)
#every nth position is a "mark"
#every window therefore contains exactly 1 mark
#the minimum in the window is the minimum of:
# the minimum from the window start to the following mark; and
# the minimum from the window end the the preceding (same) mark
#calculate the minimum from every window start index to the next mark
for mark in range(n-1, len(data), n):
v = data[mark]
if (mark < len(result)):
result[mark] = v
for i in range(mark-1, mark-n, -1):
v = min(data[i],v)
if (i < len(result)):
result[i] = v
#for each window, calculate the running total from the preceding mark
# to its end. The first window ends at the first mark
#then combine it with the first distance to get the window minimum
nextMarkPos = 0
for i in range(0,len(result)):
if i == nextMarkPos:
v = data[i+n-1]
nextMarkPos += n
else:
v = min(data[i+n-1],v)
result[i] = min(result[i],v)
return result
应多个请求将其从“问题编辑”部分移至此处。
受到 Matt Timmermans 在答案中给出的更简单实现的启发,我制作了一个 cpu-rolling min 的多核版本。代码如下:
@njit(parallel= True)
def rollin_min2(data, n):
"""
1) make a loop that iterates over K sections of n elements; each section is independent so that it can benefit from multicores cpu
2) for each iteration of sections, generate backward local minimum(sec_min2) and forward minimum(sec_min1)
say m=8, len(data)= 23, then we only need the idx= (reversed to 7,6,5,...,1,0 (0 means minimum up until idx= 0)),
1st iter
result[7]= min_until 0,
result[8]= min(min(data[7:9]) and min_until 1),
result[9]= min(min(data[7:10]) and m_til 2)
...
result[14]= min(min(data[7:15]) and m_til 7)
2nd iter
result[15]= min_until 8,
result[16]= min(min(data[15:17]) and m_til 9),
result[17]= min(min(data[15:18]) and m_til 10)
...
result[22]= min(min(data[15:23]) and m_til 15)
"""
ar_len= len(data)
sec_min1= np.empty(ar_len, dtype = data.dtype)
sec_min2= np.empty(ar_len, dtype = data.dtype)
for i in prange(n-1):
sec_min1[i]= np.nan
for sec in prange(ar_len//n):
s2_min= data[n*sec+ n-1]
s1_min= data[n*sec+ n]
for i in range(n-1,-1,-1):
if data[n*sec+i] < s2_min:
s2_min= data[n*sec+i]
sec_min2[n*sec+i]= s2_min
sec_min1[n*sec+ n-1]= sec_min2[n*sec]
for i in range(n-1):
if n*sec+n+i < ar_len:
if data[n*sec+n+i] < s1_min:
s1_min= data[n*sec+n+i]
sec_min1[n*sec+n+i]= min(s1_min, sec_min2[n*sec+i+1])
else:
break
return sec_min1
我实际上已经花了一个小时来测试 rolling min 的各种实现。在我的 6C/12T 笔记本电脑中,这个多核版本在 n 为 "medium size" 时效果最佳。但是,当 n 至少是源数据长度的 30% 时,其他实现开始胜出。必须有更好的方法来改进此功能,但在编辑时我还不知道它们。