是否可以在仍在生成数据时计算中位数? Python 在线中位数计算器
Is it possible to compute median while data is still being generated? Python online median calculator
我见过这个问题的更广泛版本,其中个人正在寻找不止一个汇总统计数据,但我没有看到提出的解决方案。我只对 Python 中的中位数感兴趣。
假设我在循环中生成一百万个值。由于内存问题,我无法将百万个值保存到列表中并在完成后计算中位数。是否可以边走边计算中位数?意思是,我只是逐步对值求和,然后除以一百万。对于中位数,答案似乎并不那么直观。
我卡在了 "thought experiment" 部分,所以我无法真正尝试任何我认为可行的方法。我不确定这是否是已经实现的算法,但如果已经实现我就找不到了。
除非您对 "values" 的想法以某种可利用的方式受到限制,否则这是不可能的; and/or 你可以多次传递数据; and/or 您也愿意将内容存储在磁盘上。假设您知道有 5 个值,都是不同的整数,并且您知道前 3 个是 5、6、7。中位数将是其中之一,但此时您不知道是哪一个,所以您必须记住他们都。如果接下来是 1 和 2,则中位数是 5;如果接下来是 4 和 8,则为 6;如果接下来是 8 和 9,则为 7。
这显然可以推广到任何奇数个值 range(i, i + 2*N+1)
,在您看到其中第一个 N+1
时:中位数可以是第一个 N+1
中的任何一个 N+1
,所以除非值的性质有一些可利用的东西,否则你必须在那时记住所有这些值。
可利用的示例:您知道最多有 100 个 distinct 值。然后你可以使用字典来计算每个出现的次数,并根据分布的压缩表示轻松计算最后的中位数。
近似
由于已经提到的原因,这里一般没有 "shortcut"。但我将附上 Python 合理的一次性逼近方法的代码,详见 "The Remedian: A Robust Averaging Method for Large Data Sets"。该论文还指出了其他近似方法。
关键:选择一个大于1的奇数B
。然后将连续的元素存储在缓冲区中,直到记录了其中的B
个。到那时,这些中值会进入下一个级别,缓冲区会被清除。他们的中位数仍然是那些 B
元素保留的唯一记忆。
同样的模式也在更深层次上继续存在:在 B
个中位数中的 B
个中位数被记录后, 个 中位数的中位数被记录下来进入下一级,清除二级缓冲区。然后,中位数前进仍然是进入其中的 B**2
个元素的唯一记忆。
等等。在最坏的情况下,它可能需要存储 B * log(N, B)
个值,其中 N
是元素的总数。在 Python 中很容易编写代码,因此可以根据需要创建缓冲区,因此 N
不需要提前知道。
如果B >= N
,方法是准确的,但是你也存储了每个元素。如果 B < N
,它是中位数的近似值。有关详细信息,请参阅论文 - 它非常复杂。这是一个让它看起来很好的案例;-)
>>> import random
>>> xs = [random.random() for i in range(1000001)]
>>> sorted(xs)[500000] # true median
0.5006315438367565
>>> w = MedianEst(11)
>>> for x in xs:
... w.add(x)
>>> w.get()
0.5008443883489089
也许令人惊讶的是,如果按排序顺序添加输入,情况会更糟:
>>> w.clear()
>>> for x in sorted(xs):
... w.add(x)
>>> w.get()
0.5021045181828147
用户注意!这是代码:
class MedianEst:
def __init__(self, B):
assert B > 1 and B & 1
self.B = B
self.half = B >> 1
self.clear()
def add(self, x):
for xs in self.a:
xs.append(x)
if len(xs) == self.B:
x = sorted(xs)[self.half]
xs.clear()
else:
break
else:
self.a.append([x])
def get(self):
total = 0
weight = 1
accum = []
for xs in self.a:
total += len(xs) * weight
accum.extend((x, weight) for x in xs)
weight *= self.B
# `total` elements in all
limit = total // 2 + 1
total = 0
for x, weight in sorted(accum):
total += weight
if total >= limit:
return x
def clear(self):
self.a = []
我见过这个问题的更广泛版本,其中个人正在寻找不止一个汇总统计数据,但我没有看到提出的解决方案。我只对 Python 中的中位数感兴趣。
假设我在循环中生成一百万个值。由于内存问题,我无法将百万个值保存到列表中并在完成后计算中位数。是否可以边走边计算中位数?意思是,我只是逐步对值求和,然后除以一百万。对于中位数,答案似乎并不那么直观。
我卡在了 "thought experiment" 部分,所以我无法真正尝试任何我认为可行的方法。我不确定这是否是已经实现的算法,但如果已经实现我就找不到了。
除非您对 "values" 的想法以某种可利用的方式受到限制,否则这是不可能的; and/or 你可以多次传递数据; and/or 您也愿意将内容存储在磁盘上。假设您知道有 5 个值,都是不同的整数,并且您知道前 3 个是 5、6、7。中位数将是其中之一,但此时您不知道是哪一个,所以您必须记住他们都。如果接下来是 1 和 2,则中位数是 5;如果接下来是 4 和 8,则为 6;如果接下来是 8 和 9,则为 7。
这显然可以推广到任何奇数个值 range(i, i + 2*N+1)
,在您看到其中第一个 N+1
时:中位数可以是第一个 N+1
中的任何一个 N+1
,所以除非值的性质有一些可利用的东西,否则你必须在那时记住所有这些值。
可利用的示例:您知道最多有 100 个 distinct 值。然后你可以使用字典来计算每个出现的次数,并根据分布的压缩表示轻松计算最后的中位数。
近似
由于已经提到的原因,这里一般没有 "shortcut"。但我将附上 Python 合理的一次性逼近方法的代码,详见 "The Remedian: A Robust Averaging Method for Large Data Sets"。该论文还指出了其他近似方法。
关键:选择一个大于1的奇数B
。然后将连续的元素存储在缓冲区中,直到记录了其中的B
个。到那时,这些中值会进入下一个级别,缓冲区会被清除。他们的中位数仍然是那些 B
元素保留的唯一记忆。
同样的模式也在更深层次上继续存在:在 B
个中位数中的 B
个中位数被记录后, 个 中位数的中位数被记录下来进入下一级,清除二级缓冲区。然后,中位数前进仍然是进入其中的 B**2
个元素的唯一记忆。
等等。在最坏的情况下,它可能需要存储 B * log(N, B)
个值,其中 N
是元素的总数。在 Python 中很容易编写代码,因此可以根据需要创建缓冲区,因此 N
不需要提前知道。
如果B >= N
,方法是准确的,但是你也存储了每个元素。如果 B < N
,它是中位数的近似值。有关详细信息,请参阅论文 - 它非常复杂。这是一个让它看起来很好的案例;-)
>>> import random
>>> xs = [random.random() for i in range(1000001)]
>>> sorted(xs)[500000] # true median
0.5006315438367565
>>> w = MedianEst(11)
>>> for x in xs:
... w.add(x)
>>> w.get()
0.5008443883489089
也许令人惊讶的是,如果按排序顺序添加输入,情况会更糟:
>>> w.clear()
>>> for x in sorted(xs):
... w.add(x)
>>> w.get()
0.5021045181828147
用户注意!这是代码:
class MedianEst:
def __init__(self, B):
assert B > 1 and B & 1
self.B = B
self.half = B >> 1
self.clear()
def add(self, x):
for xs in self.a:
xs.append(x)
if len(xs) == self.B:
x = sorted(xs)[self.half]
xs.clear()
else:
break
else:
self.a.append([x])
def get(self):
total = 0
weight = 1
accum = []
for xs in self.a:
total += len(xs) * weight
accum.extend((x, weight) for x in xs)
weight *= self.B
# `total` elements in all
limit = total // 2 + 1
total = 0
for x, weight in sorted(accum):
total += weight
if total >= limit:
return x
def clear(self):
self.a = []