是否可以在仍在生成数据时计算中位数? Python 在线中位数计算器

Is it possible to compute median while data is still being generated? Python online median calculator

我见过这个问题的更广泛版本,其中个人正在寻找不止一个汇总统计数据,但我没有看到提出的解决方案。我只对 Python 中的中位数感兴趣。

假设我在循环中生成一百万个值。由于内存问题,我无法将百万个值保存到列表中并在完成后计算中位数。是否可以边走边计算中位数?意思是,我只是逐步对值求和,然后除以一百万。对于中位数,答案似乎并不那么直观。

我卡在了 "thought experiment" 部分,所以我无法真正尝试任何我认为可行的方法。我不确定这是否是已经实现的算法,但如果已经实现我就找不到了。

除非您对 "values" 的想法以某种可利用的方式受到限制,否则这是不可能的; and/or 你可以多次传递数据; and/or 您也愿意将内容存储在磁盘上。假设您知道有 5 个值,都是不同的整数,并且您知道前 3 个是 5、6、7。中位数将是其中之一,但此时您不知道是哪一个,所以您必须记住他们都。如果接下来是 1 和 2,则中位数是 5;如果接下来是 4 和 8,则为 6;如果接下来是 8 和 9,则为 7。

这显然可以推广到任何奇数个值 range(i, i + 2*N+1),在您看到其中第一个 N+1 时:中位数可以是第一个 N+1 中的任何一个 N+1,所以除非值的性质有一些可利用的东西,否则你必须在那时记住所有这些值。

可利用的示例:您知道最多有 100 个 distinct 值。然后你可以使用字典来计算每个出现的次数,并根据分布的压缩表示轻松计算最后的中位数。

近似

由于已经提到的原因,这里一般没有 "shortcut"。但我将附上 Python 合理的一次性逼近方法的代码,详见 "The Remedian: A Robust Averaging Method for Large Data Sets"。该论文还指出了其他近似方法。

关键:选择一个大于1的奇数B。然后将连续的元素存储在缓冲区中,直到记录了其中的B个。到那时,这些中值会进入下一个级别,缓冲区会被清除。他们的中位数仍然是那些 B 元素保留的唯一记忆。

同样的模式也在更深层次上继续存在:在 B 个中位数中的 B 个中位数被记录后, 中位数的中位数被记录下来进入下一级,清除二级缓冲区。然后,中位数前进仍然是进入其中的 B**2 个元素的唯一记忆。

等等。在最坏的情况下,它可能需要存储 B * log(N, B) 个值,其中 N 是元素的总数。在 Python 中很容易编写代码,因此可以根据需要创建缓冲区,因此 N 不需要提前知道。

如果B >= N,方法是准确的,但是你也存储了每个元素。如果 B < N,它是中位数的近似值。有关详细信息,请参阅论文 - 它非常复杂。这是一个让它看起来很好的案例;-)

>>> import random
>>> xs = [random.random() for i in range(1000001)]
>>> sorted(xs)[500000] # true median
0.5006315438367565
>>> w = MedianEst(11)
>>> for x in xs:
...     w.add(x)
>>> w.get()
0.5008443883489089

也许令人惊讶的是,如果按排序顺序添加输入,情况会更糟:

>>> w.clear()
>>> for x in sorted(xs):
...     w.add(x)
>>> w.get()
0.5021045181828147

用户注意!这是代码:

class MedianEst:
    def __init__(self, B):
        assert B > 1 and B & 1
        self.B = B
        self.half = B >> 1
        self.clear()

    def add(self, x):
        for xs in self.a:
            xs.append(x)
            if len(xs) == self.B:
                x = sorted(xs)[self.half]
                xs.clear()
            else:
                break
        else:
            self.a.append([x])

    def get(self):
        total = 0
        weight = 1
        accum = []
        for xs in self.a:
            total += len(xs) * weight
            accum.extend((x, weight) for x in xs)
            weight *= self.B
        # `total` elements in all
        limit = total // 2 + 1
        total = 0
        for x, weight in sorted(accum):
            total += weight
            if total >= limit:
                return x

    def clear(self):
        self.a = []