如何有效地计算 运行 中位数

how to calculate running median efficiently

我借用了一些代码,试图实现一个函数来计算大量数据的 运行 中位数。当前的对我来说太慢了(棘手的部分是我需要从 运行 框中排除所有零 )。下面是代码:

from itertools import islice
from collections import deque
from bisect import bisect_left,insort

def median(s):
    sp = [nz for nz in s if nz!=0]
    print sp
    Mnow = len(sp)
    if Mnow == 0:
        return 0
    else:
        return np.median(sp)

def RunningMedian(seq, M):
    seq = iter(seq)
    s = []

    # Set up list s (to be sorted) and load deque with first window of seq
    s = [item for item in islice(seq,M)]
    d = deque(s)

    # Sort it in increasing order and extract the median ("center" of the sorted window)
    s.sort()
    medians = [median(s)]
    for item in seq:
        old = d.popleft()          # pop oldest from left
        d.append(item)             # push newest in from right
        del s[bisect_left(s, old)] # locate insertion point and then remove old 
        insort(s, item)            # insert newest such that new sort is not required        
        medians.append(median(s))
    return medians

效果不错,唯一的缺点就是太慢了。任何人都可以帮助我改进代码以提高效率吗?

在我探索了所有的可能性之后,下面的简单代码可以比较高效地计算:

def RunningMedian(x,N):
    idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
    b = [row[row>0] for row in x[idx]]
    return np.array(map(np.median,b))
    #return np.array([np.median(c) for c in b])  # This also works

一种方法如下:

def RunningMedian(x,N):
    idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
    b = [row[row>0] for row in x[idx]]
    return np.array(map(np.median,b))
    #return np.array([np.median(c) for c in b])  # This also works

我找到了一个更快的(快几万倍),复制如下:

from collections import deque
from bisect import insort, bisect_left
from itertools import islice
def running_median_insort(seq, window_size):
    """Contributed by Peter Otten"""
    seq = iter(seq)
    d = deque()
    s = []
    result = []
    for item in islice(seq, window_size):
        d.append(item)
        insort(s, item)
        result.append(s[len(d)//2])
    m = window_size // 2
    for item in seq:
        old = d.popleft()
        d.append(item)
        del s[bisect_left(s, old)]
        insort(s, item)
        result.append(s[m])
    return result

看看link:running_median

您可以在处理数据样本时使用两个 heaps 来维护数据样本的下半部分和上半部分。该算法是这样的:对于每个值,将其放入适当的堆中 'rebalance' 堆中,使它们的大小相差不超过 1。然后,要获得中位数,只需从中取出第一个元素较大的堆,或者如果两个堆的大小相等,则取两个堆的第一个元素的平均值。此解决方案具有 O(n log(n)) 时间复杂度。

from heapq import heappush, heappop


class RunningMedian:
    def __init__(self):
        self.lowers, self.highers = [], []

    def add_number(self, number):
        if not self.highers or number > self.highers[0]:
            heappush(self.highers, number)
        else:
            heappush(self.lowers, -number)  # for lowers we need a max heap
        self.rebalance()

    def rebalance(self):
        if len(self.lowers) - len(self.highers) > 1:
            heappush(self.highers, -heappop(self.lowers))
        elif len(self.highers) - len(self.lowers) > 1:
            heappush(self.lowers, -heappop(self.highers))

    def get_median(self):
        if len(self.lowers) == len(self.highers):
            return (-self.lowers[0] + self.highers[0])/2
        elif len(self.lowers) > len(self.highers):
            return -self.lowers[0]
        else:
            return self.highers[0]

演示:

>>> running_median = RunningMedian()
>>> for n in (12, 4, 5, 3, 8, 7):
...     running_median.add_number(n)
...     print(running_median.get_median())
... 
12
8.0
5
4.5
5
6.0

我遇到过这个问题,但不是作为程序员。我想要专用硬件来做一个序列的移动中值,移动相当大 window.

我按照上面的建议对两个堆进行了处理,但我也注意到堆内的所有数据移动和比较都可以并行完成。因此,将数据插入堆中或从堆中移动数据所需的时间是 O(1),一个常数。

对不同方法的一些速度进行了很好的比较here. But, in general, the scipy.ndimage.median_filter似乎是一种简单有效的方法。要包括排除零,可以这样做:

from scipy.ndimage import median_filter

def RunningMedian(x, N):
    return median_filter(x[x != 0], N)