迭代性能

Iteration performance

我做了一个函数来通过实验评估以下问题,取自A Primer for the Mathematics of Financial Engineering

问题:设 X 为您必须抛硬币直到正面朝上的次数。什么是 E[X](期望值)和 var(X)(方差)?

按照教科书的解答,下面的代码给出正确答案:

from sympy import *

k = symbols('k')

Expected_Value = summation(k/2**k, (k, 1, oo)) # Both solutions work

Variance = summation(k**2/2**k, (k, 1, oo)) - Expected_Value**2

为了验证这个答案,我决定尝试制作一个函数来模拟这个实验。下面的代码是我想出来的。

def coin_toss(toss, probability=[0.5, 0.5]): 

    """Computes expected value and variance for coin toss experiment"""

    flips = [] # Collects total number of throws until heads appear per experiment.

    for _ in range(toss): # Simulate n flips

        number_flips=[] # Number of flips until heads is tossed  

        while sum(number_flips) == 0: # Continue simulation while Tails are thrown 

            number_flips.append(np.random.choice(2, p=probability)) # Append result to number_flips

        flips.append(len(number_flips)) #Append number of flips until lands heads to flips

    Expected_Value, Variance  = np.mean(flips), np.var(flips)    

    print('E[X]: {}'.format(Expected_Value), 
          '\nvar[X]: {}'.format(Variance)) # Return expected value 

这个运行时间如果我模拟1e6次实验,使用下面的代码大约是35.9秒。

   from timeit import Timer

   t1 = Timer("""coin_toss(1000000)""", """from __main__ import coin_toss""")

   print(t1.timeit(1))

为了加深我对 Python 的理解,这是解决此类问题的一种特别 efficient/pythonic 方式吗?我如何利用现有库来改进 efficiency/flow 执行?

为了以高效且 pythonic 的方式编码,您必须看一看 PythonSpeed and NumPy。可以在下面找到使用 numpy 的更快代码的一个示例。

abc在python+numpy中的优化是向量化操作,在这种情况下是相当困难的,因为有一个while可以实际上是无限的,硬币可以连续翻转 40 次反面。但是,不是用 toss 次迭代执行 for,而是可以在 中完成工作。这是问题 coin_toss 与此 coin_toss_2d 方法之间的主要区别。

coin_toss_2d

coin_toss_2d的主要优点是分块工作,这些块的大小有一些默认值,但可以修改(因为它们会影响速度)。因此,它只会迭代 while current_toss<toss 多次 toss%flips_at_a_time。这是通过 numpy 实现的,它允许生成一个矩阵,其中包含重复 flips_at_a_time 次掷硬币实验 flips_per_try 次的结果。该矩阵将包含 0(反面)和 1(正面)。

# i.e. doing only 5 repetitions with 3 flips_at_a_time
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# Out 
[[0 0 0] # still no head, we will have to keep trying
 [0 1 1] # head at the 2nd try (position 1 in python)
 [1 0 0]
 [1 0 1]
 [0 0 1]]

一旦得到这个结果,就会调用argmax。这找到对应于每行(重复)的最大值(将是 1,一个头)的索引,并且在多次出现的情况下,returns 第一个,这正是所需要的,第一个头之后尾巴序列。

maxs = flip_events.argmax(axis=1)
# Out
[0 1 0 0 2] 
# The first position is 0, however, flip_events[0,0]!=1, it's not a head!

但是必须考虑所有行都是0的情况。在这种情况下,最大值将为 0,它的第一次出现也将为 0,即第一列(try)。因此,我们检查在第一次尝试时找到的所有最大值是否对应于第一次尝试的头部。

not_finished = (maxs==0) & (flip_events[:,0]!=1)
# Out
[ True False False False False] # first repetition is not finished

如果不是这种情况,我们将循环重复相同的过程,但仅针对在任何尝试中都没有正面的重复。

n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
    flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
    maxs2 = flip_events.argmax(axis=1)
    maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
    not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
    not_finished[not_finished] = not_finished2
    n = np.sum(not_finished)
# Out
# flip_events
[[1 0 1]] # Now there is a head
# maxs2
[0]
# maxs
[3 1 0 0 2] # The value of the still unfinished repetition has been updated,
# taking into account that the first position in flip_events is the 4th,
# without affecting the rest

然后存储与第一次出现的头部对应的索引(我们必须加 1,因为 python 索引从零开始而不是 1)。有一个 try ... except ... 块来处理 toss 不是 repetitions_at_a_time.

的倍数的情况
def coin_toss_2d(toss, probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
    # Initialize and preallocate data
    current_toss = 0
    flips = np.empty(toss)
    # loop by chunks
    while current_toss<toss:
        # repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
        flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
        # store first head ocurrence
        maxs = flip_events.argmax(axis=1)
        # Check for all tails sequences, that is, repetitions were we have to keep trying to get a head
        not_finished = (maxs==0) & (flip_events[:,0]!=1)
        n = np.sum(not_finished)
        while n!=0: # while there are sequences without any head
            flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
            maxs2 = flip_events.argmax(axis=1)
            maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
            not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
            not_finished[not_finished] = not_finished2
            n = np.sum(not_finished)
        # try except in case toss is not multiple of repetitions_at_a_time, in general, no error is raised, that is why a try is useful
        try:
            flips[current_toss:current_toss+repetitions_at_a_time] = maxs+1
        except ValueError:
            flips[current_toss:] = maxs[:toss-current_toss]+1
        # Update current_toss and move to the next chunk
        current_toss += repetitions_at_a_time
    # Once all values are obtained, average and return them
    Expected_Value, Variance  = np.mean(flips), np.var(flips)    
    return Expected_Value, Variance

coin_toss_map

这里的代码基本相同,但是现在,内在的 while 是在一个单独的函数中完成的,它是使用 map 从包装函数 coin_toss_map 调用的。

def toss_chunk(args):
    probability,repetitions_at_a_time,flips_per_try = args
    # repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
    flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
    # store first head ocurrence
    maxs = flip_events.argmax(axis=1)
    # Check for all tails sequences
    not_finished = (maxs==0) & (flip_events[:,0]!=1)
    n = np.sum(not_finished)
    while n!=0: # while there are sequences without any head
        flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
        maxs2 = flip_events.argmax(axis=1)
        maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
        not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
        not_finished[not_finished] = not_finished2
        n = np.sum(not_finished)
    return maxs+1
def coin_toss_map(toss,probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
    n_chunks, remainder = divmod(toss,repetitions_at_a_time)
    args = [(probability,repetitions_at_a_time,flips_per_try) for _ in range(n_chunks)]
    if remainder:
        args.append((probability,remainder,flips_per_try))
    flips = np.concatenate(map(toss_chunk,args))
    # Once all values are obtained, average and return them
    Expected_Value, Variance  = np.mean(flips), np.var(flips)    
    return Expected_Value, Variance

性能比较

在我的电脑中,我得到以下计算时间:

In [1]: %timeit coin_toss(10**6)
# Out 
# ('E[X]: 2.000287', '\nvar[X]: 1.99791891763')
# ('E[X]: 2.000459', '\nvar[X]: 2.00692478932')
# ('E[X]: 1.998118', '\nvar[X]: 1.98881045808')
# ('E[X]: 1.9987', '\nvar[X]: 1.99508631')
# 1 loop, best of 3: 46.2 s per loop

In [2]: %timeit coin_toss_2d(10**6,repetitions_at_a_time=5*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 197 ms per loop

In [3]: %timeit coin_toss_map(10**6,repetitions_at_a_time=4*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 192 ms per loop

均值和方差的结果是:

In [4]: [coin_toss_2d(10**6,repetitions_at_a_time=10**5,flips_per_try=10) for _ in range(4)]
# Out
# [(1.999848, 1.9990739768960009),
#  (2.000654, 2.0046035722839997),
#  (1.999835, 2.0072329727749993),
#  (1.999277, 2.001566477271)]

In [4]: [coin_toss_map(10**6,repetitions_at_a_time=10**5,flips_per_try=4) for _ in range(4)]
# Out
# [(1.999552, 2.0005057992959996),
#  (2.001733, 2.011159996711001),
#  (2.002308, 2.012128673136001),
#  (2.000738, 2.003613455356)]