减少嵌套循环的计算时间

Reducing computation Time for a nested Loop

我想减少下面发布的代码的计算时间。本质上,下面的代码将数组 Tf 计算为以下嵌套循环的乘积:

Af = lambda x: Approximationf(f, x)
for idxp, prior in enumerate(grid_prior):
    for idxy, y in enumerate(grid_y):
        posterior = lambda yPrime: updated_posterior(prior, y, yPrime)
        integrateL = integrate(lambda z: Af(np.array([y*np.exp(mu[0])*z,
                                            posterior(y*np.exp(mu[0]) * z)])))
        integrateH = integrate(lambda z: Af(np.array([y*np.exp(mu[1])*z,
                                            posterior(y * np.exp(mu[1])*z)])))
        Tf[idxy, idxp] = (h[idxy, idxp] +
                          beta * ((prior * integrateL) +
                                  (1-prior)*integrateH))

对象 posteriorintegrateAf 是在遍历循环时重复调用的函数。函数 posterior 计算一个名为 posterior 的标量。函数Af在样本点x逼近函数f并将结果传递给函数integrate,函数integrate计算函数f的条件期望.

下面贴出的代码是一个更难问题的简化。我不必 运行 嵌套循环一次,而是必须多次 运行 来解决不动点问题。此问题使用任意函数 f 初始化,并创建函数 Tf。然后在嵌套循环的下一次迭代中使用该数组来计算另一个数组 Tf。该过程一直持续到收敛。

我决定不报告 cProfile 模块的结果。通过忽略嵌套循环上的迭代直到收敛,许多内部 python 执行需要相对较长的时间。然而,当迭代直到收敛时,这些内部执行失去了它们的相对重要性,并在 cPython 输出中被降级到较低的位置。

我尝试模仿不同的建议来降低我在网上找到的稍微修改过的问题的循环计算时间。不幸的是,我做不到,也无法真正找到解决这些问题的通用方法。有人知道如何减少这个循环的计算时间吗?我很感激任何帮助!

import numpy as np
from scipy import interpolate
from scipy.stats import lognorm
from scipy.integrate import fixed_quad

# == The following lines define the paramters for the problem == #
gamma, beta, sigma, mu = 2, 0.95,  0.0255, np.array([0.0113, -0.0016])
grid_y, grid_prior = np.linspace(7, 10, 15), np.linspace(0, 1, 5)
int_min, int_max = np.exp(- 7 * sigma), np.exp(+ 7 * sigma)
phi = lognorm(sigma)

f = np.array([[ 1.29824564,  1.29161017,  1.28379398,  1.2676886, 1.15320819],
[ 1.26290108, 1.26147364,  1.24755837,  1.23819851,  1.11912802],
[ 1.22847276,  1.23013194,  1.22128198,  1.20996971, 1.0864706 ],
[ 1.19528104,  1.19645792,  1.19056084,  1.17980572, 1.05532966],
[ 1.16344832,  1.16279841,  1.15997191,  1.15169942,  1.02564429],
[ 1.13301675,  1.13109952,  1.12883038,  1.1236645,  0.99730795],
[ 1.10398195,  1.10125013,  1.0988554,   1.09612933,  0.97019688],
[ 1.07630046,  1.07356297,  1.07126087,  1.06878758,  0.94417658],
[ 1.04989686,  1.04728542,  1.04514962,  1.04289665,  0.91910765],
[ 1.02467087,  1.0221532,   1.02011384,  1.01797238,  0.89485162],
[ 1.00050447,  0.99795025,  0.99576917,  0.99330549,  0.87127677],
[ 0.97726849,  0.97443288,  0.97190614,  0.96861352, 0.84826362],
[ 0.95482612,  0.94783816,  0.94340077,  0.93753641,  0.82569922],
[ 0.93302433,  0.91985497,  0.9059118,   0.88895196,  0.80348449],
[ 0.91165997,  0.88253486,  0.86126688,  0.84769975,  0.78147382]])

# == Calculate function h, Used in the loop below ==  #
E0 = np.exp((1-gamma)*mu + (1-gamma)**2*sigma**2/2)
h = np.outer(beta*grid_y**(1-gamma), grid_prior*E0[0] + (1-grid_prior)*E0[1])

def integrate(g):
    """
    This function is repeatedly called in the loop below
    """
    integrand = lambda z: g(z) * phi.pdf(z)
    result = fixed_quad(integrand, int_min, int_max, n=15)[0]
    return result

def Approximationf(f, x):
    """
    This function approximates the function f and is repeatedly called in
    the loop
    """
        # == simplify notation == #
    fApprox = np.empty((x.shape[1]))
    lower, middle = (x[0] < grid_y[0]), (x[0] >= grid_y[0]) & (x[0] <= grid_y[-1])
    upper = (x[0] > grid_y[-1])

    # = Calculate Polynomial == #
    y_tile = np.tile(grid_y, len(grid_prior))
    prior_repeat = np.repeat(grid_prior, len(grid_y))
    s = interpolate.SmoothBivariateSpline(y_tile, prior_repeat,
                                          f.T.flatten(), kx=5, ky=5)

    # == interpolation == #
    fApprox[middle] = s(x[0, middle], x[1, middle])[:, 0]

    # == Extrapolation == #
    if any(lower):
        s0 = s(lower[lower]*grid_y[0], x[1, lower])[:, 0]
        s1 = s(lower[lower]*grid_y[1], x[1, lower])[:, 0]
        slope_lower = (s0 - s1)/(grid_y[0] - grid_y[1])
        fApprox[lower] = s0 + slope_lower*(x[0, lower] - grid_y[0])
    if any(upper):
        sM1 = s(upper[upper]*grid_y[-1], x[1, upper])[:, 0]
        sM2 = s(upper[upper]*grid_y[-2], x[1, upper])[:, 0]
        slope_upper = (sM1 - sM2)/(grid_y[-1] - grid_y[-2])
        fApprox[upper] = sM1 + slope_upper*(x[0, upper] - grid_y[-1])
    return fApprox

def updated_posterior(prior, y, yPrime):
    """
    This function calculates the posterior weights put on each distribution.
    It is the thrid function repeatedly called in the loop below.
    """
    z_0 = yPrime/(y * np.exp(mu[0]))
    z_1 = yPrime/(y * np.exp(mu[1]))
    l0, l1 = phi.pdf(z_0), phi.pdf(z_1)
    posterior = l0*prior / (l0*prior + l1*(1-prior))
    return posterior

Tf = np.empty_like(f)
Af = lambda x: Approximationf(f, x)
# == Apply the T operator to f == #
for idxp, prior in enumerate(grid_prior):
    for idxy, y in enumerate(grid_y):
        posterior = lambda yPrime: updated_posterior(prior, y, yPrime)
        integrateL = integrate(lambda z: Af(np.array([y*np.exp(mu[0])*z,
                                            posterior(y*np.exp(mu[0]) * z)])))
        integrateH = integrate(lambda z: Af(np.array([y*np.exp(mu[1])*z,
                                            posterior(y * np.exp(mu[1])*z)])))
        Tf[idxy, idxp] = (h[idxy, idxp] +
                          beta * ((prior * integrateL) +
                                  (1-prior)*integrateH))

多处理的一些经验 根据 reptilicus 评论,我决定研究如何使用多处理模块。我的想法是从并行计算 intergrateL 数组开始。为此,我将外循环固定为 prior =0.5,并希望迭代内循环 grid_y。但是,我仍然必须考虑到 intergrateLz 中的 lambda 函数。我尝试按照堆栈溢出问题“How to let Pool.map take a lambda function”的建议编写了以下代码:

prior = 0.5
Af = lambda x: Approximationf(f, x)

class Iteration(object):
    def __init__(self,state):
        self.y = state
    def __call__(self,z):
        Af(np.array([self.y*np.exp(mu[0])*z, 
                    updated_posterior(prior,
                                      self.y,self.y*np.exp(mu[0])*z)]))

with Pool(processes=4) as pool:
    out = pool.map(Iteration(y), np.nditer(grid_y))

不幸的是,python returns 在 运行 启动程序时:

IndexError: tuple index out of range

乍一看,这些嗅觉像是一个微不足道的错误,但我无法补救。有人知道如何解决这个问题吗?再次感谢收到的任何建议!

我会以嵌套循环为目标,就像这样。这是伪代码,但它应该可以帮助您入门。

def do_calc(idxp, idxy, y, prior):
    posterior = lambda yPrime: updated_posterior(prior, y, yPrime)
    integrateL = integrate(lambda z: Af(np.array([y*np.exp(mu[0])*z,
                                            posterior(y*np.exp(mu[0]) * z)])))
    integrateH = integrate(lambda z: Af(np.array([y*np.exp(mu[1])*z,
                                            posterior(y * np.exp(mu[1])*z)])))

    return (idxp, idyy, posterior, integrateL, integrateH)

pool = multiprocessing.pool(8) # or however many cores you have
results = []
# This is the part that I would try to parallelize 
for idxp, prior in enumerate(grid_prior):
    for idxy, y in enumerate(grid_y):
        results.append(pool.apply_async(do_calc, args=(idxpy, idxy,  y, prior))
pool.close()
pool.join()
results = [r.get() for r in results]
for r in results:
    Tf[r[0], r[1] = (h[r[0], r[1]] +
                          beta * ((prior * r[3]) +
                                  (1-prior)*r[4))