Select 时间复杂度为 O(1) 的流中的随机项 space
Select random item from stream in O(1) space
Select 以均匀概率从流中随机抽取一个项目,使用常量 space
流提供以下操作:
class Stream:
def __init__(self, data):
self.data = list(data)
def read(self):
if not self.data:
return None
head, *self.data = self.data
return head
def peek(self):
return self.data[0] if self.data else None
流中的元素(因此 data
的元素)大小不变,并且它们都不是 None
,因此 None
表示流结束。流的长度只能通过消费整个流来获知。并注意计算元素数量消耗 O(log n) space.
我相信没有办法统一使用O(1)[=30=从流中随机选择一个项目] space.
任何人都可以(反)证明这一点吗?
常量space?当然,Reservoir Sampling,常数space,线性时间
一些简单测试的代码
import numpy as np
def stream(size):
for k in range(size):
yield k
def resSample(ni, s):
ret = np.empty(ni, dtype=np.int64)
k = 0
for k, v in enumerate(s):
if k < ni:
ret[k] = v
else:
idx = np.random.randint(0, k+1)
if (idx < ni):
ret[idx] = v
return ret
SIZE = 12
s = stream(SIZE)
q = resSample(1, s)
print(q)
我看到有一个关于 RNG 的问题。假设我有真正的 RNG,一次 returns 一位的硬件设备。我们仅在获取索引的代码中使用它。
if (idx < ni):
一个元素触发条件的唯一方法是 select
是 ni=1
,因此 idx
只能为零。
因此 np.random.randint(0, k+1) 这样的实现就像
def trng(k):
for _ in range(k+1):
if next_true_bit():
return 1 # as soon as it is not 0, we don't care
return 0 # all bits are zero, index is zero, proceed with exchange
QED,这种实现是可能的,因此这种采样方法应该有效
更新
@kyrill 可能是对的 - 我必须进行计数(log2(k) 存储),到目前为止还没有办法避免它。即使使用 RNG 技巧,我也必须以 1/k
的概率对 0 进行采样,并且这个 k
会随着流的大小而增长。
为每个元素生成一个随机数,并记住数字最小的元素。
这是我最喜欢的答案,但您可能正在寻找的答案是:
如果流的长度为 N 项,则返回第 Nth 项的概率为 1/N 。由于这个概率对于每个 N 都是不同的,任何能够完成这个任务的机器在读取不同长度的流后必须进入不同的状态。由于可能长度的数量是无限的,所需的可能状态数量也是无限的,机器将需要无限量的内存来区分它们。
Select 以均匀概率从流中随机抽取一个项目,使用常量 space
流提供以下操作:
class Stream:
def __init__(self, data):
self.data = list(data)
def read(self):
if not self.data:
return None
head, *self.data = self.data
return head
def peek(self):
return self.data[0] if self.data else None
流中的元素(因此 data
的元素)大小不变,并且它们都不是 None
,因此 None
表示流结束。流的长度只能通过消费整个流来获知。并注意计算元素数量消耗 O(log n) space.
我相信没有办法统一使用O(1)[=30=从流中随机选择一个项目] space.
任何人都可以(反)证明这一点吗?
常量space?当然,Reservoir Sampling,常数space,线性时间
一些简单测试的代码
import numpy as np
def stream(size):
for k in range(size):
yield k
def resSample(ni, s):
ret = np.empty(ni, dtype=np.int64)
k = 0
for k, v in enumerate(s):
if k < ni:
ret[k] = v
else:
idx = np.random.randint(0, k+1)
if (idx < ni):
ret[idx] = v
return ret
SIZE = 12
s = stream(SIZE)
q = resSample(1, s)
print(q)
我看到有一个关于 RNG 的问题。假设我有真正的 RNG,一次 returns 一位的硬件设备。我们仅在获取索引的代码中使用它。
if (idx < ni):
一个元素触发条件的唯一方法是 select
是 ni=1
,因此 idx
只能为零。
因此 np.random.randint(0, k+1) 这样的实现就像
def trng(k):
for _ in range(k+1):
if next_true_bit():
return 1 # as soon as it is not 0, we don't care
return 0 # all bits are zero, index is zero, proceed with exchange
QED,这种实现是可能的,因此这种采样方法应该有效
更新
@kyrill 可能是对的 - 我必须进行计数(log2(k) 存储),到目前为止还没有办法避免它。即使使用 RNG 技巧,我也必须以 1/k
的概率对 0 进行采样,并且这个 k
会随着流的大小而增长。
为每个元素生成一个随机数,并记住数字最小的元素。
这是我最喜欢的答案,但您可能正在寻找的答案是:
如果流的长度为 N 项,则返回第 Nth 项的概率为 1/N 。由于这个概率对于每个 N 都是不同的,任何能够完成这个任务的机器在读取不同长度的流后必须进入不同的状态。由于可能长度的数量是无限的,所需的可能状态数量也是无限的,机器将需要无限量的内存来区分它们。