加速大都会-- Python 中的黑斯廷斯

Speed up Metropolis--Hastings in Python

我有一些代码使用 MCMC 对后验分布进行采样,特别是 Metropolis Hastings。我使用 scipy 生成随机样本:

import numpy as np
from scipy import stats

def get_samples(n):
    """
    Generate and return a randomly sampled posterior.

    For simplicity, Prior is fixed as Beta(a=2,b=5), Likelihood is fixed as Normal(0,2)

    :type n: int
    :param n: number of iterations

    :rtype: numpy.ndarray
    """
    x_t = stats.uniform(0,1).rvs() # initial value
    posterior = np.zeros((n,))
    for t in range(n):
        x_prime = stats.norm(loc=x_t).rvs() # candidate
        p1 = stats.beta(a=2,b=5).pdf(x_prime)*stats.norm(loc=0,scale=2).pdf(x_prime) # prior * likelihood 
        p2 = stats.beta(a=2,b=5).pdf(x_t)*stats.norm(loc=0,scale=2).pdf(x_t) # prior * likelihood 
        alpha = p1/p2 # ratio
        u = stats.uniform(0,1).rvs() # random uniform
        if u <= alpha:
            x_t = x_prime # accept
            posterior[t] = x_t
        elif u > alpha:
            x_t = x_t # reject
    posterior = posterior[np.where(posterior > 0)] # get rid of initial zeros that don't contribute to distribution
    return posterior

通常,我尽量避免在 python 中使用显式 for 循环 - 我会尝试使用纯 numpy 生成所有内容。然而,对于这个算法的情况,带有 if 语句的 for 循环是不可避免的。因此,代码相当慢。当我分析我的代码时,它花费大部分时间在 for 循环中(很明显),更具体地说,最慢的部分是生成随机数; stats.beta().pdf()stats.norm().pdf().

有时我使用numba to speed up my code for matrix operations. While numba is compatible with some numpy operations, generating random numbers is not one of them. Numba has a cuda rng,但这仅限于正态分布和均匀分布。

我的问题是,有没有一种方法可以使用与 numba 兼容的各种分布的某种随机抽样来显着加快上面的代码?

我们不必局限于 numba,但它是我所知道的唯一易于使用的优化器。更一般地说,我正在寻找在 python.

中的 for 循环 中加快各种分布(beta、伽马、泊松) 随机抽样的方法

不幸的是,我真的没有看到任何加速随机分布的可能性,除了用 numba 兼容的 python 代码自己重写它们。

但是加速代码瓶颈的一种简单方法是将对统计函数的两次调用替换为:

p1, p2 = (
    stats.beta(a=2, b=5).pdf([x_prime, x_t])
    * stats.norm(loc=0, scale=2).pdf([x_prime, x_t]))

另一个细微的调整可能是将 u 的生成外包到 for 循环之外:

x_t = stats.uniform(0, 1).rvs() # initial value
posterior = np.zeros((n,))
u = stats.uniform(0, 1).rvs(size=n) # random uniform
for t in range(n):  # and so on

然后在循环内索引u(当然循环里的u = stats.uniform(0,1).rvs() # random uniform行要删掉):

if u[t] <= alpha:
    x_t = x_prime # accept
    posterior[t] = x_t
elif u[t] > alpha:
    x_t = x_t # reject

还可以通过省略 elif 语句或在出于其他目的需要时将其替换为 else 来简化 if 条件。但这真的只是一个微小的改进:

if u[t] <= alpha:
    x_t = x_prime # accept
    posterior[t] = x_t

编辑

基于 jwalton 回答的另一个改进:

def new_get_samples(n):
    """
    Generate and return a randomly sampled posterior.

    For simplicity, Prior is fixed as Beta(a=2,b=5), Likelihood is fixed as Normal(0,2)

    :type n: int
    :param n: number of iterations

    :rtype: numpy.ndarray
    """

    x_cur = np.random.uniform()
    innov = norm.rvs(size=n)
    x_prop = x_cur + innov
    u = np.random.uniform(size=n)

    post_cur = beta.pdf(x_cur, a=2,b=5) * norm.pdf(x_cur, loc=0,scale=2)
    post_prop = beta.pdf(x_prop, a=2,b=5) * norm.pdf(x_prop, loc=0,scale=2)

    posterior = np.zeros((n,))
    for t in range(n):        
        alpha = post_prop[t] / post_cur
        if u[t] <= alpha:
            x_cur = x_prop[t]
            post_cur = post_prop[t]
        posterior[t] = x_cur
    return posterior

随着时间的改进:

%timeit my_get_samples(1000)
# 187 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit my_get_samples2(1000)
# 1.55 ms ± 57.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

这比 jwalton 的答案快了 121 倍。它是通过外包post_prop计算来完成的。

检查直方图,这似乎没问题。但最好问问 jwalton 是否真的可以,他似乎对这个话题有更多的了解。 :)

在开始考虑 numba 等之前,您可以对此代码进行 很多 优化。阿尔。 (仅通过巧妙地实现算法,我设法将此代码的速度提高了 25 倍)

首先,您在实施 Metropolis--Hastings 算法时存在错误。你需要保持每个方案的迭代,不管你的链是否移动。也就是说,您需要从代码中删除 posterior = posterior[np.where(posterior > 0)] 并且在每个循环的末尾都有 posterior[t] = x_t.

其次,这个例子看起来很奇怪。通常,对于这些类型的推理问题,我们希望根据一些观察来推断分布的参数。但是,在这里,分布的参数是已知的,而不是你在抽样观察?不管怎样,不管怎样,我很乐意使用你的示例并向你展示如何加快速度。

加速

首先,从 for 主循环中删除任何不依赖于 t 值的内容。首先从 for 循环中删除随机游走创新的生成:

    x_t = stats.uniform(0,1).rvs()
    innov = stats.norm(loc=0).rvs(size=n)
    for t in range(n):
        x_prime = x_t + innov[t]

当然也可以将u的随机生成移出for循环:

    x_t = stats.uniform(0,1).rvs()
    innov = stats.norm(loc=0).rvs(size=n)

    u = np.random.uniform(size=n)
    for t in range(n):
        x_prime = x_t + innov[t]
        ...
        if u[t] <= alpha:

另一个问题是您在每个循环中计算当前后验p2,这是没有必要的。在每个循环中,您需要计算提议的后验 p1,当提议被接受时,您可以将 p2 更新为等于 p1:

    x_t = stats.uniform(0,1).rvs()
    innov = stats.norm(loc=0).rvs(size=n)

    u = np.random.uniform(size=n)

    p2 = stats.beta(a=2,b=5).pdf(x_t)*stats.norm(loc=0,scale=2).pdf(x_t)
    for t in range(n):
        x_prime = x_t + innov[t]

        p1 = stats.beta(a=2,b=5).pdf(x_prime)*stats.norm(loc=0,scale=2).pdf(x_prime)
        ...
        if u[t] <= alpha:
            x_t = x_prime # accept
            p2 = p1

        posterior[t] = x_t

一个很小的改进可能是将 scipy 统计函数直接导入名称 space:

from scipy.stats import norm, beta

另一个非常小的改进是注意到代码中的 elif 语句不执行任何操作,因此可以将其删除。

把这些放在一起并使用我想出的更合理的变量名:

from scipy.stats import norm, beta
import numpy as np

def my_get_samples(n, sigma=1):

    x_cur = np.random.uniform()
    innov = norm.rvs(size=n, scale=sigma)
    u = np.random.uniform(size=n)

    post_cur = beta.pdf(x_cur, a=2, b=5) * norm.pdf(x_cur, loc=0, scale=2)

    posterior = np.zeros(n)
    for t in range(n):
        x_prop = x_cur + innov[t]

        post_prop = beta.pdf(x_prop, a=2, b=5) * norm.pdf(x_prop, loc=0, scale=2)
        alpha = post_prop / post_cur
        if u[t] <= alpha:
            x_cur = x_prop
            post_cur = post_prop

        posterior[t] = x_cur

    return posterior

现在,进行速度比较:

%timeit get_samples(1000)
3.19 s ± 5.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit my_get_samples(1000)
127 ms ± 484 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

速度提高了 25 倍

ESS

值得注意的是,就 MCMC 算法而言,速度并不是一切。实际上,您感兴趣的是每秒可以从后部进行的独立(ish)绘制的数量。通常,这是用 ESS (effective sample size) 评估的。您可以通过调整随机游走来提高算法的效率(从而增加每秒抽取的有效样本)。

要做到这一点,通常要进行初步试验 运行,即 samples = my_get_samples(1000)。从这个输出计算 sigma = 2.38**2 * np.var(samples)。然后应该使用此值将方案中的随机游走调整为 innov = norm.rvs(size=n, scale=sigma)。看似随意出现的 2.38^2 起源于:

Weak convergence and optimal scaling of random walk Metropolis algorithms (1997). A. Gelman, W. R. Gilks, and G. O. Roberts.

为了说明调整可以带来的改进,让我们对我的算法进行两次 运行 调整,一个调整,另一个未调整,均使用 10000 次迭代:

x = my_get_samples(10000)
y = my_get_samples(10000, sigma=0.12)

fig, ax = plt.subplots(1, 2)
ax[0].hist(x, density=True, bins=25, label='Untuned algorithm', color='C0')
ax[1].hist(y, density=True, bins=25, label='Tuned algorithm', color='C1')
ax[0].set_ylabel('density')
ax[0].set_xlabel('x'), ax[1].set_xlabel('x')
fig.legend()

您可以立即看到调整对我们算法效率的改进。请记住,两个 运行 都是为相同的迭代次数制作的。

最后的想法

如果您的算法需要很长时间才能收敛,或者如果您的样本具有大量自相关,我会考虑研究 Cython 以进一步优化速度。

我还建议查看 PyStan 项目。它需要一点时间来适应,但它的 NUTS HMC 算法可能会胜过任何您可以手写的 Metropolis--Hastings 算法。