Python 自定义 Zipf 数字生成器性能不佳
Python Custom Zipf Number Generator Performing Poorly
我需要一个自定义的类似 Zipf 的数字生成器,因为 numpy.random.zipf
函数不能满足我的需要。首先,它的 alpha
必须大于 1.0
并且我需要 0.5
的 alpha。其次,它的基数与样本量直接相关,我需要制作比基数更多的样本,例如从只有 6 个唯一值的 Zipfian 分布中列出 1000 个元素。
@stanga 向此发布了 a great solution。
import random
import bisect
import math
class ZipfGenerator:
def __init__(self, n, alpha):
# Calculate Zeta values from 1 to n:
tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
# Store the translation map:
self.distMap = [x / zeta[-1] for x in zeta]
def next(self):
# Take a uniform 0-1 pseudo-random value:
u = random.random()
# Translate the Zipf variable:
return bisect.bisect(self.distMap, u) - 1
alpha
可以小于 1.0
,对于固定基数 n
,采样可以是无限的。问题是运行太慢了。
# Calculate Zeta values from 1 to n:
tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
这两行是罪魁祸首。当我选择 n=50000
时,我可以在大约 10 秒内生成我的列表。我需要在 n=5000000
时执行此操作,但这是不可行的。我不完全理解为什么它执行得这么慢,因为(我认为)它具有线性复杂性和浮点运算看起来很简单。我在一个好的服务器上使用 Python 2.6.6。
是否有我可以进行的优化或完全不同的解决方案来满足我的要求?
编辑:我正在使用@ev-br 推荐的修改,用可能的解决方案更新我的问题。我已将其简化为 returns 整个列表的子例程。 @ev-br 建议将 bisect
更改为 searchssorted
是正确的,因为前者也被证明是一个瓶颈。
def randZipf(n, alpha, numSamples):
# Calculate Zeta values from 1 to n:
tmp = numpy.power( numpy.arange(1, n+1), -alpha )
zeta = numpy.r_[0.0, numpy.cumsum(tmp)]
# Store the translation map:
distMap = [x / zeta[-1] for x in zeta]
# Generate an array of uniform 0-1 pseudo-random values:
u = numpy.random.random(numSamples)
# bisect them with distMap
v = numpy.searchsorted(distMap, u)
samples = [t-1 for t in v]
return samples
先举个小例子
In [1]: import numpy as np
In [2]: import math
In [3]: alpha = 0.1
In [4]: n = 5
In [5]: tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
In [6]: zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
In [7]: tmp
Out[7]:
[1.0,
0.9330329915368074,
0.8959584598407623,
0.8705505632961241,
0.8513399225207846]
In [8]: zeta
Out[8]:
[0,
1.0,
1.9330329915368074,
2.82899145137757,
3.699542014673694,
4.550881937194479]
现在,让我们尝试对其进行矢量化,从最内层的操作开始。 reduce
调用本质上是一个累加和:
In [9]: np.cumsum(tmp)
Out[9]: array([ 1. , 1.93303299, 2.82899145, 3.69954201, 4.55088194])
你想要一个前导零,所以让我们在前面添加它:
In [11]: np.r_[0., np.cumsum(tmp)]
Out[11]:
array([ 0. , 1. , 1.93303299, 2.82899145, 3.69954201,
4.55088194])
你的tmp
数组也可以一次性构建:
In [12]: tmp_vec = np.power(np.arange(1, n+1) , -alpha)
In [13]: tmp_vec
Out[13]: array([ 1. , 0.93303299, 0.89595846, 0.87055056, 0.85133992])
现在,快速而肮脏的时间安排
In [14]: %%timeit
....: n = 1000
....: tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
....: zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
....:
100 loops, best of 3: 3.16 ms per loop
In [15]: %%timeit
....: n = 1000
....: tmp_vec = np.power(np.arange(1, n+1) , -alpha)
....: zeta_vec = np.r_[0., np.cumsum(tmp)]
....:
10000 loops, best of 3: 101 µs per loop
现在,随着 n
的增加变得更好:
In [18]: %%timeit
n = 50000
tmp_vec = np.power(np.arange(1, n+1) , -alpha)
zeta_vec = np.r_[0, np.cumsum(tmp)]
....:
100 loops, best of 3: 3.26 ms per loop
与
相比
In [19]: %%timeit
n = 50000
tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
....:
1 loops, best of 3: 7.01 s per loop
在线下,对 bisect
的调用可以替换为 np.searchsorted
。
编辑: 一些与原始问题没有直接关系的评论,而是基于我对可能使您失望的猜测:
- 随机生成器应该接受种子。您可以依赖 numpy 的全局
np.random.seed
,但最好将其设为默认为 None
的显式参数(意思是不要播种它。)
samples = [t-1 for t in v]
不需要,只需 return v-1
.
- 最好避免混用驼峰式和 pep8_lower_case_with_underscores。
- 请注意,这与
scipy.stats.rv_discrete
所做的非常相似。如果您只需要采样,那很好。如果您需要一个完整的发行版,您可以考虑使用它。
我需要一个自定义的类似 Zipf 的数字生成器,因为 numpy.random.zipf
函数不能满足我的需要。首先,它的 alpha
必须大于 1.0
并且我需要 0.5
的 alpha。其次,它的基数与样本量直接相关,我需要制作比基数更多的样本,例如从只有 6 个唯一值的 Zipfian 分布中列出 1000 个元素。
@stanga 向此发布了 a great solution。
import random
import bisect
import math
class ZipfGenerator:
def __init__(self, n, alpha):
# Calculate Zeta values from 1 to n:
tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
# Store the translation map:
self.distMap = [x / zeta[-1] for x in zeta]
def next(self):
# Take a uniform 0-1 pseudo-random value:
u = random.random()
# Translate the Zipf variable:
return bisect.bisect(self.distMap, u) - 1
alpha
可以小于 1.0
,对于固定基数 n
,采样可以是无限的。问题是运行太慢了。
# Calculate Zeta values from 1 to n:
tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
这两行是罪魁祸首。当我选择 n=50000
时,我可以在大约 10 秒内生成我的列表。我需要在 n=5000000
时执行此操作,但这是不可行的。我不完全理解为什么它执行得这么慢,因为(我认为)它具有线性复杂性和浮点运算看起来很简单。我在一个好的服务器上使用 Python 2.6.6。
是否有我可以进行的优化或完全不同的解决方案来满足我的要求?
编辑:我正在使用@ev-br 推荐的修改,用可能的解决方案更新我的问题。我已将其简化为 returns 整个列表的子例程。 @ev-br 建议将 bisect
更改为 searchssorted
是正确的,因为前者也被证明是一个瓶颈。
def randZipf(n, alpha, numSamples):
# Calculate Zeta values from 1 to n:
tmp = numpy.power( numpy.arange(1, n+1), -alpha )
zeta = numpy.r_[0.0, numpy.cumsum(tmp)]
# Store the translation map:
distMap = [x / zeta[-1] for x in zeta]
# Generate an array of uniform 0-1 pseudo-random values:
u = numpy.random.random(numSamples)
# bisect them with distMap
v = numpy.searchsorted(distMap, u)
samples = [t-1 for t in v]
return samples
先举个小例子
In [1]: import numpy as np
In [2]: import math
In [3]: alpha = 0.1
In [4]: n = 5
In [5]: tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
In [6]: zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
In [7]: tmp
Out[7]:
[1.0,
0.9330329915368074,
0.8959584598407623,
0.8705505632961241,
0.8513399225207846]
In [8]: zeta
Out[8]:
[0,
1.0,
1.9330329915368074,
2.82899145137757,
3.699542014673694,
4.550881937194479]
现在,让我们尝试对其进行矢量化,从最内层的操作开始。 reduce
调用本质上是一个累加和:
In [9]: np.cumsum(tmp)
Out[9]: array([ 1. , 1.93303299, 2.82899145, 3.69954201, 4.55088194])
你想要一个前导零,所以让我们在前面添加它:
In [11]: np.r_[0., np.cumsum(tmp)]
Out[11]:
array([ 0. , 1. , 1.93303299, 2.82899145, 3.69954201,
4.55088194])
你的tmp
数组也可以一次性构建:
In [12]: tmp_vec = np.power(np.arange(1, n+1) , -alpha)
In [13]: tmp_vec
Out[13]: array([ 1. , 0.93303299, 0.89595846, 0.87055056, 0.85133992])
现在,快速而肮脏的时间安排
In [14]: %%timeit
....: n = 1000
....: tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
....: zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
....:
100 loops, best of 3: 3.16 ms per loop
In [15]: %%timeit
....: n = 1000
....: tmp_vec = np.power(np.arange(1, n+1) , -alpha)
....: zeta_vec = np.r_[0., np.cumsum(tmp)]
....:
10000 loops, best of 3: 101 µs per loop
现在,随着 n
的增加变得更好:
In [18]: %%timeit
n = 50000
tmp_vec = np.power(np.arange(1, n+1) , -alpha)
zeta_vec = np.r_[0, np.cumsum(tmp)]
....:
100 loops, best of 3: 3.26 ms per loop
与
相比In [19]: %%timeit
n = 50000
tmp = [1. / (math.pow(float(i), alpha)) for i in range(1, n+1)]
zeta = reduce(lambda sums, x: sums + [sums[-1] + x], tmp, [0])
....:
1 loops, best of 3: 7.01 s per loop
在线下,对 bisect
的调用可以替换为 np.searchsorted
。
编辑: 一些与原始问题没有直接关系的评论,而是基于我对可能使您失望的猜测:
- 随机生成器应该接受种子。您可以依赖 numpy 的全局
np.random.seed
,但最好将其设为默认为None
的显式参数(意思是不要播种它。) samples = [t-1 for t in v]
不需要,只需return v-1
.- 最好避免混用驼峰式和 pep8_lower_case_with_underscores。
- 请注意,这与
scipy.stats.rv_discrete
所做的非常相似。如果您只需要采样,那很好。如果您需要一个完整的发行版,您可以考虑使用它。