迭代性能
Iteration performance
我做了一个函数来通过实验评估以下问题,取自A Primer for the Mathematics of Financial Engineering。
问题:设 X 为您必须抛硬币直到正面朝上的次数。什么是 E[X](期望值)和 var(X)(方差)?
按照教科书的解答,下面的代码给出正确答案:
from sympy import *
k = symbols('k')
Expected_Value = summation(k/2**k, (k, 1, oo)) # Both solutions work
Variance = summation(k**2/2**k, (k, 1, oo)) - Expected_Value**2
为了验证这个答案,我决定尝试制作一个函数来模拟这个实验。下面的代码是我想出来的。
def coin_toss(toss, probability=[0.5, 0.5]):
"""Computes expected value and variance for coin toss experiment"""
flips = [] # Collects total number of throws until heads appear per experiment.
for _ in range(toss): # Simulate n flips
number_flips=[] # Number of flips until heads is tossed
while sum(number_flips) == 0: # Continue simulation while Tails are thrown
number_flips.append(np.random.choice(2, p=probability)) # Append result to number_flips
flips.append(len(number_flips)) #Append number of flips until lands heads to flips
Expected_Value, Variance = np.mean(flips), np.var(flips)
print('E[X]: {}'.format(Expected_Value),
'\nvar[X]: {}'.format(Variance)) # Return expected value
这个运行时间如果我模拟1e6次实验,使用下面的代码大约是35.9秒。
from timeit import Timer
t1 = Timer("""coin_toss(1000000)""", """from __main__ import coin_toss""")
print(t1.timeit(1))
为了加深我对 Python 的理解,这是解决此类问题的一种特别 efficient/pythonic 方式吗?我如何利用现有库来改进 efficiency/flow 执行?
为了以高效且 pythonic 的方式编码,您必须看一看 PythonSpeed and NumPy。可以在下面找到使用 numpy 的更快代码的一个示例。
abc在python+numpy中的优化是向量化操作,在这种情况下是相当困难的,因为有一个while
可以实际上是无限的,硬币可以连续翻转 40 次反面。但是,不是用 toss
次迭代执行 for
,而是可以在 块 中完成工作。这是问题 coin_toss
与此 coin_toss_2d
方法之间的主要区别。
coin_toss_2d
coin_toss_2d
的主要优点是分块工作,这些块的大小有一些默认值,但可以修改(因为它们会影响速度)。因此,它只会迭代 while current_toss<toss
多次 toss%flips_at_a_time
。这是通过 numpy 实现的,它允许生成一个矩阵,其中包含重复 flips_at_a_time
次掷硬币实验 flips_per_try
次的结果。该矩阵将包含 0(反面)和 1(正面)。
# i.e. doing only 5 repetitions with 3 flips_at_a_time
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# Out
[[0 0 0] # still no head, we will have to keep trying
[0 1 1] # head at the 2nd try (position 1 in python)
[1 0 0]
[1 0 1]
[0 0 1]]
一旦得到这个结果,就会调用argmax
。这找到对应于每行(重复)的最大值(将是 1,一个头)的索引,并且在多次出现的情况下,returns 第一个,这正是所需要的,第一个头之后尾巴序列。
maxs = flip_events.argmax(axis=1)
# Out
[0 1 0 0 2]
# The first position is 0, however, flip_events[0,0]!=1, it's not a head!
但是必须考虑所有行都是0的情况。在这种情况下,最大值将为 0,它的第一次出现也将为 0,即第一列(try)。因此,我们检查在第一次尝试时找到的所有最大值是否对应于第一次尝试的头部。
not_finished = (maxs==0) & (flip_events[:,0]!=1)
# Out
[ True False False False False] # first repetition is not finished
如果不是这种情况,我们将循环重复相同的过程,但仅针对在任何尝试中都没有正面的重复。
n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
maxs2 = flip_events.argmax(axis=1)
maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
not_finished2 = (maxs2==0) & (flip_events[:,0]!=1)
not_finished[not_finished] = not_finished2
n = np.sum(not_finished)
# Out
# flip_events
[[1 0 1]] # Now there is a head
# maxs2
[0]
# maxs
[3 1 0 0 2] # The value of the still unfinished repetition has been updated,
# taking into account that the first position in flip_events is the 4th,
# without affecting the rest
然后存储与第一次出现的头部对应的索引(我们必须加 1,因为 python 索引从零开始而不是 1)。有一个 try ... except ...
块来处理 toss
不是 repetitions_at_a_time
.
的倍数的情况
def coin_toss_2d(toss, probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
# Initialize and preallocate data
current_toss = 0
flips = np.empty(toss)
# loop by chunks
while current_toss<toss:
# repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# store first head ocurrence
maxs = flip_events.argmax(axis=1)
# Check for all tails sequences, that is, repetitions were we have to keep trying to get a head
not_finished = (maxs==0) & (flip_events[:,0]!=1)
n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
maxs2 = flip_events.argmax(axis=1)
maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
not_finished2 = (maxs2==0) & (flip_events[:,0]!=1)
not_finished[not_finished] = not_finished2
n = np.sum(not_finished)
# try except in case toss is not multiple of repetitions_at_a_time, in general, no error is raised, that is why a try is useful
try:
flips[current_toss:current_toss+repetitions_at_a_time] = maxs+1
except ValueError:
flips[current_toss:] = maxs[:toss-current_toss]+1
# Update current_toss and move to the next chunk
current_toss += repetitions_at_a_time
# Once all values are obtained, average and return them
Expected_Value, Variance = np.mean(flips), np.var(flips)
return Expected_Value, Variance
coin_toss_map
这里的代码基本相同,但是现在,内在的 while 是在一个单独的函数中完成的,它是使用 map
从包装函数 coin_toss_map
调用的。
def toss_chunk(args):
probability,repetitions_at_a_time,flips_per_try = args
# repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# store first head ocurrence
maxs = flip_events.argmax(axis=1)
# Check for all tails sequences
not_finished = (maxs==0) & (flip_events[:,0]!=1)
n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
maxs2 = flip_events.argmax(axis=1)
maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
not_finished2 = (maxs2==0) & (flip_events[:,0]!=1)
not_finished[not_finished] = not_finished2
n = np.sum(not_finished)
return maxs+1
def coin_toss_map(toss,probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
n_chunks, remainder = divmod(toss,repetitions_at_a_time)
args = [(probability,repetitions_at_a_time,flips_per_try) for _ in range(n_chunks)]
if remainder:
args.append((probability,remainder,flips_per_try))
flips = np.concatenate(map(toss_chunk,args))
# Once all values are obtained, average and return them
Expected_Value, Variance = np.mean(flips), np.var(flips)
return Expected_Value, Variance
性能比较
在我的电脑中,我得到以下计算时间:
In [1]: %timeit coin_toss(10**6)
# Out
# ('E[X]: 2.000287', '\nvar[X]: 1.99791891763')
# ('E[X]: 2.000459', '\nvar[X]: 2.00692478932')
# ('E[X]: 1.998118', '\nvar[X]: 1.98881045808')
# ('E[X]: 1.9987', '\nvar[X]: 1.99508631')
# 1 loop, best of 3: 46.2 s per loop
In [2]: %timeit coin_toss_2d(10**6,repetitions_at_a_time=5*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 197 ms per loop
In [3]: %timeit coin_toss_map(10**6,repetitions_at_a_time=4*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 192 ms per loop
均值和方差的结果是:
In [4]: [coin_toss_2d(10**6,repetitions_at_a_time=10**5,flips_per_try=10) for _ in range(4)]
# Out
# [(1.999848, 1.9990739768960009),
# (2.000654, 2.0046035722839997),
# (1.999835, 2.0072329727749993),
# (1.999277, 2.001566477271)]
In [4]: [coin_toss_map(10**6,repetitions_at_a_time=10**5,flips_per_try=4) for _ in range(4)]
# Out
# [(1.999552, 2.0005057992959996),
# (2.001733, 2.011159996711001),
# (2.002308, 2.012128673136001),
# (2.000738, 2.003613455356)]
我做了一个函数来通过实验评估以下问题,取自A Primer for the Mathematics of Financial Engineering。
问题:设 X 为您必须抛硬币直到正面朝上的次数。什么是 E[X](期望值)和 var(X)(方差)?
按照教科书的解答,下面的代码给出正确答案:
from sympy import *
k = symbols('k')
Expected_Value = summation(k/2**k, (k, 1, oo)) # Both solutions work
Variance = summation(k**2/2**k, (k, 1, oo)) - Expected_Value**2
为了验证这个答案,我决定尝试制作一个函数来模拟这个实验。下面的代码是我想出来的。
def coin_toss(toss, probability=[0.5, 0.5]):
"""Computes expected value and variance for coin toss experiment"""
flips = [] # Collects total number of throws until heads appear per experiment.
for _ in range(toss): # Simulate n flips
number_flips=[] # Number of flips until heads is tossed
while sum(number_flips) == 0: # Continue simulation while Tails are thrown
number_flips.append(np.random.choice(2, p=probability)) # Append result to number_flips
flips.append(len(number_flips)) #Append number of flips until lands heads to flips
Expected_Value, Variance = np.mean(flips), np.var(flips)
print('E[X]: {}'.format(Expected_Value),
'\nvar[X]: {}'.format(Variance)) # Return expected value
这个运行时间如果我模拟1e6次实验,使用下面的代码大约是35.9秒。
from timeit import Timer
t1 = Timer("""coin_toss(1000000)""", """from __main__ import coin_toss""")
print(t1.timeit(1))
为了加深我对 Python 的理解,这是解决此类问题的一种特别 efficient/pythonic 方式吗?我如何利用现有库来改进 efficiency/flow 执行?
为了以高效且 pythonic 的方式编码,您必须看一看 PythonSpeed and NumPy。可以在下面找到使用 numpy 的更快代码的一个示例。
abc在python+numpy中的优化是向量化操作,在这种情况下是相当困难的,因为有一个while
可以实际上是无限的,硬币可以连续翻转 40 次反面。但是,不是用 toss
次迭代执行 for
,而是可以在 块 中完成工作。这是问题 coin_toss
与此 coin_toss_2d
方法之间的主要区别。
coin_toss_2d
coin_toss_2d
的主要优点是分块工作,这些块的大小有一些默认值,但可以修改(因为它们会影响速度)。因此,它只会迭代 while current_toss<toss
多次 toss%flips_at_a_time
。这是通过 numpy 实现的,它允许生成一个矩阵,其中包含重复 flips_at_a_time
次掷硬币实验 flips_per_try
次的结果。该矩阵将包含 0(反面)和 1(正面)。
# i.e. doing only 5 repetitions with 3 flips_at_a_time
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# Out
[[0 0 0] # still no head, we will have to keep trying
[0 1 1] # head at the 2nd try (position 1 in python)
[1 0 0]
[1 0 1]
[0 0 1]]
一旦得到这个结果,就会调用argmax
。这找到对应于每行(重复)的最大值(将是 1,一个头)的索引,并且在多次出现的情况下,returns 第一个,这正是所需要的,第一个头之后尾巴序列。
maxs = flip_events.argmax(axis=1)
# Out
[0 1 0 0 2]
# The first position is 0, however, flip_events[0,0]!=1, it's not a head!
但是必须考虑所有行都是0的情况。在这种情况下,最大值将为 0,它的第一次出现也将为 0,即第一列(try)。因此,我们检查在第一次尝试时找到的所有最大值是否对应于第一次尝试的头部。
not_finished = (maxs==0) & (flip_events[:,0]!=1)
# Out
[ True False False False False] # first repetition is not finished
如果不是这种情况,我们将循环重复相同的过程,但仅针对在任何尝试中都没有正面的重复。
n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
maxs2 = flip_events.argmax(axis=1)
maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
not_finished2 = (maxs2==0) & (flip_events[:,0]!=1)
not_finished[not_finished] = not_finished2
n = np.sum(not_finished)
# Out
# flip_events
[[1 0 1]] # Now there is a head
# maxs2
[0]
# maxs
[3 1 0 0 2] # The value of the still unfinished repetition has been updated,
# taking into account that the first position in flip_events is the 4th,
# without affecting the rest
然后存储与第一次出现的头部对应的索引(我们必须加 1,因为 python 索引从零开始而不是 1)。有一个 try ... except ...
块来处理 toss
不是 repetitions_at_a_time
.
def coin_toss_2d(toss, probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
# Initialize and preallocate data
current_toss = 0
flips = np.empty(toss)
# loop by chunks
while current_toss<toss:
# repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# store first head ocurrence
maxs = flip_events.argmax(axis=1)
# Check for all tails sequences, that is, repetitions were we have to keep trying to get a head
not_finished = (maxs==0) & (flip_events[:,0]!=1)
n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
maxs2 = flip_events.argmax(axis=1)
maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
not_finished2 = (maxs2==0) & (flip_events[:,0]!=1)
not_finished[not_finished] = not_finished2
n = np.sum(not_finished)
# try except in case toss is not multiple of repetitions_at_a_time, in general, no error is raised, that is why a try is useful
try:
flips[current_toss:current_toss+repetitions_at_a_time] = maxs+1
except ValueError:
flips[current_toss:] = maxs[:toss-current_toss]+1
# Update current_toss and move to the next chunk
current_toss += repetitions_at_a_time
# Once all values are obtained, average and return them
Expected_Value, Variance = np.mean(flips), np.var(flips)
return Expected_Value, Variance
coin_toss_map
这里的代码基本相同,但是现在,内在的 while 是在一个单独的函数中完成的,它是使用 map
从包装函数 coin_toss_map
调用的。
def toss_chunk(args):
probability,repetitions_at_a_time,flips_per_try = args
# repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# store first head ocurrence
maxs = flip_events.argmax(axis=1)
# Check for all tails sequences
not_finished = (maxs==0) & (flip_events[:,0]!=1)
n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
maxs2 = flip_events.argmax(axis=1)
maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
not_finished2 = (maxs2==0) & (flip_events[:,0]!=1)
not_finished[not_finished] = not_finished2
n = np.sum(not_finished)
return maxs+1
def coin_toss_map(toss,probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
n_chunks, remainder = divmod(toss,repetitions_at_a_time)
args = [(probability,repetitions_at_a_time,flips_per_try) for _ in range(n_chunks)]
if remainder:
args.append((probability,remainder,flips_per_try))
flips = np.concatenate(map(toss_chunk,args))
# Once all values are obtained, average and return them
Expected_Value, Variance = np.mean(flips), np.var(flips)
return Expected_Value, Variance
性能比较
在我的电脑中,我得到以下计算时间:
In [1]: %timeit coin_toss(10**6)
# Out
# ('E[X]: 2.000287', '\nvar[X]: 1.99791891763')
# ('E[X]: 2.000459', '\nvar[X]: 2.00692478932')
# ('E[X]: 1.998118', '\nvar[X]: 1.98881045808')
# ('E[X]: 1.9987', '\nvar[X]: 1.99508631')
# 1 loop, best of 3: 46.2 s per loop
In [2]: %timeit coin_toss_2d(10**6,repetitions_at_a_time=5*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 197 ms per loop
In [3]: %timeit coin_toss_map(10**6,repetitions_at_a_time=4*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 192 ms per loop
均值和方差的结果是:
In [4]: [coin_toss_2d(10**6,repetitions_at_a_time=10**5,flips_per_try=10) for _ in range(4)]
# Out
# [(1.999848, 1.9990739768960009),
# (2.000654, 2.0046035722839997),
# (1.999835, 2.0072329727749993),
# (1.999277, 2.001566477271)]
In [4]: [coin_toss_map(10**6,repetitions_at_a_time=10**5,flips_per_try=4) for _ in range(4)]
# Out
# [(1.999552, 2.0005057992959996),
# (2.001733, 2.011159996711001),
# (2.002308, 2.012128673136001),
# (2.000738, 2.003613455356)]