用于随机数生成的自定义 numpy(或 scipy?)概率分布

Custom numpy (or scipy?) probability distribution for random number generation

问题

Tl;dr:我想要一个函数,它在一个区间内随机 returns 一个浮点数(或可选的浮点数数组),概率分布类似于“高斯”分布和均匀分布的总和。

函数(或class)——比方说custom_distr()——应该有输入(默认值已经给出):

输出是标量或 ndarray,具体取决于大小。

必须对输出进行缩放以证明累积分布等于 1(我不确定该怎么做)。

请注意,我遵循 numpy.random.Generatoruniformnormal 发行版的命名约定作为参考,但命名法和使用的包对我来说并不重要。

我试过的

由于找不到直接“添加”numpy.random.Generator的均匀分布和高斯分布的方法,我尝试使用 scipy.stats.rv_continuous subclassing,但我我卡在如何定义 _rvs 方法或 _ppf 方法以使其快速。

根据我对 rv_continuous class definition in Github 的了解,_rvs 使用 numpyrandom.RandomState(与 random.Generator 相比已过时) 进行分配。这似乎违背了使用 scipy.stats.rv_continuous subclassing.

的目的

另一个选项是定义 _ppf,我的自定义分布的百分比函数,因为根据 rv_generic class definition in Github,默认函数 _rvs 使用 _ppf .但是我在手动定义这个函数时遇到了麻烦。

下面是一个 MWE,使用 low=0.0high=1.0loc=0.3scale=0.02 进行了测试。名称与“问题”部分不同,因为术语的术语在 numpyscipy 之间不同。

import numpy as np
from scipy.stats import rv_continuous
import scipy.special as sc
import matplotlib.pyplot as plt
import time


# The class definition
class custom_distr(rv_continuous):
    def __init__(self, my_loc=0.5, my_scale=0.5, a=0.0, b=1.0, *args, **kwargs):
        super(custom_distr, self).__init__(a, b, *args, **kwargs)
        self.a = a
        self.b = b
        self.my_loc = my_loc
        self.my_scale = my_scale

    def _pdf(self, x):
        # uniform distribution
        aux = 1/(self.b-self.a)
        # gaussian distribution
        aux += 1/np.sqrt(2*np.pi*self.my_scale**2) * \
                 np.exp(-(x-self.my_loc)**2/2/self.my_scale**2)
        return aux/2  # divide by 2?

    def _cdf(self, x):
        # uniform distribution
        aux = (x-self.a)/(self.b-self.a)
        # gaussian distribution
        aux += 0.5*(1+sc.erf((x-self.my_loc)/(self.my_scale*np.sqrt(2))))
        return aux/2  # divide by 2?


# Testing the class
if __name__ == "__main__":
    my_cust_distr = custom_distr(name="my_dist", my_loc=0.3, my_scale=0.02)

    x = np.linspace(0.0, 1.0, 10000)

    start_t = time.time()
    the_pdf = my_cust_distr.pdf(x)
    print("PDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_pdf, label='pdf')

    start_t = time.time()
    the_cdf = my_cust_distr.cdf(x)
    print("CDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_cdf, 'r', alpha=0.8, label='cdf')

    # Get 10000 random values according to the custom distribution
    start_t = time.time()
    r = my_cust_distr.rvs(size=10000)
    print("RVS calc time: {:4.4f}".format(time.time()-start_t))

    plt.hist(r, density=True, histtype='stepfilled', alpha=0.3, bins=40)

    plt.ylim([0.0, the_pdf.max()])
    plt.grid(which='both')
    plt.legend()

    print("Maximum of CDF is: {:2.1f}".format(the_cdf[-1]))

    plt.show()

生成的图像是:

输出为:

PDF calc time: 0.0010
CDF calc time: 0.0010
RVS calc time: 11.1120
Maximum of CDF is: 1.0

我的方法计算 RVS 方法的时间太慢了。

According to Wikipedia,当 cdf 单调递增时,ppf 或百分比函数(也称为 Quantile 函数)可以写成累积分布函数 (cdf) 的反函数。

从问题中显示的图中,我的自定义分布函数的 cdf 确实单调增加 - 正如预期的那样,因为高斯分布和均匀分布的 cdf 也是如此。

“四分位数函数”下的一般正态分布 can be found in this Wikipedia page 的 ppf。定义在 ab 之间的统一函数的 ppf 可以简单地计算为 p*(b-a)+a,其中 p 是所需的概率。

但是 两个函数之和的反函数,不能(通常)简单地写成反函数! See this Mathematics Exchange post 获取更多信息。

因此,到目前为止,我找到的部分“解决方案”是在实例化对象时保存一个包含自定义分布的 cdf 的数组,然后通过一维插值找到 ppf(即 cdf 的反函数) ,只要 cdf 确实是单调递增函数,它就有效。

注意 1: 我还没有解决 Peter O 提到的边界检查问题

注意 2:如果给出 loc 的 ndarray,则建议的解决方案是不可行的,因为缺少四分位数的封闭形式表达式功能。所以,原题还是悬着。

现在的工作代码是:

import numpy as np
from scipy.stats import rv_continuous
import scipy.special as sc
import matplotlib.pyplot as plt
import time


# The class definition
class custom_distr(rv_continuous):
    def __init__(self, my_loc=0.5, my_scale=0.5, a=0.0, b=1.0,
                 init_ppf=1000, *args, **kwargs):
        super(custom_distr, self).__init__(a, b, *args, **kwargs)
        self.a = a
        self.b = b
        self.my_loc = my_loc
        self.my_scale = my_scale
        self.x = np.linspace(a, b, init_ppf)
        self.cdf_arr = self._cdf(self.x)

    def _pdf(self, x):
        # uniform distribution
        aux = 1/(self.b-self.a)
        # gaussian distribution
        aux += 1/np.sqrt(2*np.pi)/self.my_scale * \
                 np.exp(-0.5*((x-self.my_loc)/self.my_scale)**2)
        return aux/2  # divide by 2?

    def _cdf(self, x):
        # uniform distribution
        aux = (x-self.a)/(self.b-self.a)
        # gaussian distribution
        aux += 0.5*(1+sc.erf((x-self.my_loc)/(self.my_scale*np.sqrt(2))))
        return aux/2  # divide by 2?

    def _ppf(self, p):
        if np.any((p<0.0) | (p>1.0)):
            raise RuntimeError("Quantile function accepts only values between 0 and 1")
        return np.interp(p, self.cdf_arr, self.x)


# Testing the class
if __name__ == "__main__":
    a = 1.0
    b = 3.0
    my_loc = 1.5
    my_scale = 0.02

    my_cust_distr = custom_distr(name="my_dist", a=a, b=b,
                                 my_loc=my_loc, my_scale=my_scale)

    x = np.linspace(a, b, 10000)

    start_t = time.time()
    the_pdf = my_cust_distr.pdf(x)
    print("PDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_pdf, label='pdf')

    start_t = time.time()
    the_cdf = my_cust_distr.cdf(x)
    print("CDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_cdf, 'r', alpha=0.8, label='cdf')

    start_t = time.time()
    r = my_cust_distr.rvs(size=10000)
    print("RVS calc time: {:4.4f}".format(time.time()-start_t))

    plt.hist(r, density=True, histtype='stepfilled', alpha=0.3, bins=100)

    plt.ylim([0.0, the_pdf.max()])
    # plt.xlim([a, b])
    plt.grid(which='both')
    plt.legend()

    print("Maximum of CDF is: {:2.1f}".format(the_cdf[-1]))

    plt.show()

生成的图像是:

输出为:

PDF calc time: 0.0010
CDF calc time: 0.0010
RVS calc time: 0.0010
Maximum of CDF is: 1.0

代码比以前更快,代价是使用更多内存。