如何有效地计算 运行 中位数
how to calculate running median efficiently
我借用了一些代码,试图实现一个函数来计算大量数据的 运行 中位数。当前的对我来说太慢了(棘手的部分是我需要从 运行 框中排除所有零 )。下面是代码:
from itertools import islice
from collections import deque
from bisect import bisect_left,insort
def median(s):
sp = [nz for nz in s if nz!=0]
print sp
Mnow = len(sp)
if Mnow == 0:
return 0
else:
return np.median(sp)
def RunningMedian(seq, M):
seq = iter(seq)
s = []
# Set up list s (to be sorted) and load deque with first window of seq
s = [item for item in islice(seq,M)]
d = deque(s)
# Sort it in increasing order and extract the median ("center" of the sorted window)
s.sort()
medians = [median(s)]
for item in seq:
old = d.popleft() # pop oldest from left
d.append(item) # push newest in from right
del s[bisect_left(s, old)] # locate insertion point and then remove old
insort(s, item) # insert newest such that new sort is not required
medians.append(median(s))
return medians
效果不错,唯一的缺点就是太慢了。任何人都可以帮助我改进代码以提高效率吗?
在我探索了所有的可能性之后,下面的简单代码可以比较高效地计算:
def RunningMedian(x,N):
idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
b = [row[row>0] for row in x[idx]]
return np.array(map(np.median,b))
#return np.array([np.median(c) for c in b]) # This also works
一种方法如下:
def RunningMedian(x,N):
idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
b = [row[row>0] for row in x[idx]]
return np.array(map(np.median,b))
#return np.array([np.median(c) for c in b]) # This also works
我找到了一个更快的(快几万倍),复制如下:
from collections import deque
from bisect import insort, bisect_left
from itertools import islice
def running_median_insort(seq, window_size):
"""Contributed by Peter Otten"""
seq = iter(seq)
d = deque()
s = []
result = []
for item in islice(seq, window_size):
d.append(item)
insort(s, item)
result.append(s[len(d)//2])
m = window_size // 2
for item in seq:
old = d.popleft()
d.append(item)
del s[bisect_left(s, old)]
insort(s, item)
result.append(s[m])
return result
看看link:running_median
您可以在处理数据样本时使用两个 heaps 来维护数据样本的下半部分和上半部分。该算法是这样的:对于每个值,将其放入适当的堆中 'rebalance' 堆中,使它们的大小相差不超过 1。然后,要获得中位数,只需从中取出第一个元素较大的堆,或者如果两个堆的大小相等,则取两个堆的第一个元素的平均值。此解决方案具有 O(n log(n))
时间复杂度。
from heapq import heappush, heappop
class RunningMedian:
def __init__(self):
self.lowers, self.highers = [], []
def add_number(self, number):
if not self.highers or number > self.highers[0]:
heappush(self.highers, number)
else:
heappush(self.lowers, -number) # for lowers we need a max heap
self.rebalance()
def rebalance(self):
if len(self.lowers) - len(self.highers) > 1:
heappush(self.highers, -heappop(self.lowers))
elif len(self.highers) - len(self.lowers) > 1:
heappush(self.lowers, -heappop(self.highers))
def get_median(self):
if len(self.lowers) == len(self.highers):
return (-self.lowers[0] + self.highers[0])/2
elif len(self.lowers) > len(self.highers):
return -self.lowers[0]
else:
return self.highers[0]
演示:
>>> running_median = RunningMedian()
>>> for n in (12, 4, 5, 3, 8, 7):
... running_median.add_number(n)
... print(running_median.get_median())
...
12
8.0
5
4.5
5
6.0
我遇到过这个问题,但不是作为程序员。我想要专用硬件来做一个序列的移动中值,移动相当大 window.
我按照上面的建议对两个堆进行了处理,但我也注意到堆内的所有数据移动和比较都可以并行完成。因此,将数据插入堆中或从堆中移动数据所需的时间是 O(1),一个常数。
对不同方法的一些速度进行了很好的比较here. But, in general, the scipy.ndimage.median_filter似乎是一种简单有效的方法。要包括排除零,可以这样做:
from scipy.ndimage import median_filter
def RunningMedian(x, N):
return median_filter(x[x != 0], N)
我借用了一些代码,试图实现一个函数来计算大量数据的 运行 中位数。当前的对我来说太慢了(棘手的部分是我需要从 运行 框中排除所有零 )。下面是代码:
from itertools import islice
from collections import deque
from bisect import bisect_left,insort
def median(s):
sp = [nz for nz in s if nz!=0]
print sp
Mnow = len(sp)
if Mnow == 0:
return 0
else:
return np.median(sp)
def RunningMedian(seq, M):
seq = iter(seq)
s = []
# Set up list s (to be sorted) and load deque with first window of seq
s = [item for item in islice(seq,M)]
d = deque(s)
# Sort it in increasing order and extract the median ("center" of the sorted window)
s.sort()
medians = [median(s)]
for item in seq:
old = d.popleft() # pop oldest from left
d.append(item) # push newest in from right
del s[bisect_left(s, old)] # locate insertion point and then remove old
insort(s, item) # insert newest such that new sort is not required
medians.append(median(s))
return medians
效果不错,唯一的缺点就是太慢了。任何人都可以帮助我改进代码以提高效率吗?
在我探索了所有的可能性之后,下面的简单代码可以比较高效地计算:
def RunningMedian(x,N):
idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
b = [row[row>0] for row in x[idx]]
return np.array(map(np.median,b))
#return np.array([np.median(c) for c in b]) # This also works
一种方法如下:
def RunningMedian(x,N):
idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
b = [row[row>0] for row in x[idx]]
return np.array(map(np.median,b))
#return np.array([np.median(c) for c in b]) # This also works
我找到了一个更快的(快几万倍),复制如下:
from collections import deque
from bisect import insort, bisect_left
from itertools import islice
def running_median_insort(seq, window_size):
"""Contributed by Peter Otten"""
seq = iter(seq)
d = deque()
s = []
result = []
for item in islice(seq, window_size):
d.append(item)
insort(s, item)
result.append(s[len(d)//2])
m = window_size // 2
for item in seq:
old = d.popleft()
d.append(item)
del s[bisect_left(s, old)]
insort(s, item)
result.append(s[m])
return result
看看link:running_median
您可以在处理数据样本时使用两个 heaps 来维护数据样本的下半部分和上半部分。该算法是这样的:对于每个值,将其放入适当的堆中 'rebalance' 堆中,使它们的大小相差不超过 1。然后,要获得中位数,只需从中取出第一个元素较大的堆,或者如果两个堆的大小相等,则取两个堆的第一个元素的平均值。此解决方案具有 O(n log(n))
时间复杂度。
from heapq import heappush, heappop
class RunningMedian:
def __init__(self):
self.lowers, self.highers = [], []
def add_number(self, number):
if not self.highers or number > self.highers[0]:
heappush(self.highers, number)
else:
heappush(self.lowers, -number) # for lowers we need a max heap
self.rebalance()
def rebalance(self):
if len(self.lowers) - len(self.highers) > 1:
heappush(self.highers, -heappop(self.lowers))
elif len(self.highers) - len(self.lowers) > 1:
heappush(self.lowers, -heappop(self.highers))
def get_median(self):
if len(self.lowers) == len(self.highers):
return (-self.lowers[0] + self.highers[0])/2
elif len(self.lowers) > len(self.highers):
return -self.lowers[0]
else:
return self.highers[0]
演示:
>>> running_median = RunningMedian()
>>> for n in (12, 4, 5, 3, 8, 7):
... running_median.add_number(n)
... print(running_median.get_median())
...
12
8.0
5
4.5
5
6.0
我遇到过这个问题,但不是作为程序员。我想要专用硬件来做一个序列的移动中值,移动相当大 window.
我按照上面的建议对两个堆进行了处理,但我也注意到堆内的所有数据移动和比较都可以并行完成。因此,将数据插入堆中或从堆中移动数据所需的时间是 O(1),一个常数。
对不同方法的一些速度进行了很好的比较here. But, in general, the scipy.ndimage.median_filter似乎是一种简单有效的方法。要包括排除零,可以这样做:
from scipy.ndimage import median_filter
def RunningMedian(x, N):
return median_filter(x[x != 0], N)