自定义 Theano Op 进行数值积分

Custom Theano Op to do numerical integration

我正在尝试编写一个自定义 Theano Op,它对两个值之间的函数进行数值积分。 Op 是 PyMC3 的自定义可能性,它涉及一些积分的数值评估。我不能简单地使用 @as_op 装饰器,因为我需要使用 HMC 来执行 MCMC 步骤。任何帮助将不胜感激,因为这个问题似乎已经多次出现但从未得到解决(例如 , Theano: implementing an integral function)。

显然,一种解决方案是在 Theano 中编写一个数值积分器,但是当已经有非常好的积分器可用时,这似乎是一种浪费,例如通过 scipy.integrate.

为了将此作为最小示例,让我们尝试在 Op 中集成一个介于 0 和 1 之间的函数。以下在 Op 之外集成了一个 Theano 函数,并根据我的测试产生了正确的结果。

import theano
import theano.tensor as tt
from scipy.integrate import quad

x = tt.dscalar('x')
y = x**4 # integrand
f = theano.function([x], y)

print f(0)
print f(1)

ans = integrate.quad(f, 0, 1)[0]

print ans

但是,尝试在 Op 中进行集成似乎要困难得多。我目前的最大努力是:

import numpy as np
import theano
import theano.tensor as tt
from scipy import integrate

class IntOp(theano.Op):
    __props__ = ()

    def make_node(self, x):
        x = tt.as_tensor_variable(x)
        return theano.Apply(self, [x], [x.type()])

    def perform(self, node, inputs, output_storage):
        x = inputs[0]
        z = output_storage[0]

        f_to_int = theano.function([x], x)
        z[0] = tt.as_tensor_variable(integrate.quad(f_to_int, 0, 1)[0])

    def infer_shape(self, node, i0_shapes):
        return i0_shapes

    def grad(self, inputs, output_grads):
        ans = integrate.quad(output_grads[0], 0, 1)[0]
        return [ans]

intOp = IntOp()

x = tt.dmatrix('x')
y = intOp(x)

f = theano.function([x], y)

inp = np.asarray([[2, 4], [6, 8]], dtype=theano.config.floatX)
out = f(inp)

print inp
print out

出现以下错误:

Traceback (most recent call last):
  File "Whosebug.py", line 35, in <module>
    out = f(inp)
  File "/usr/local/lib/python2.7/dist-packages/theano/compile/function_module.py", line 871, in __call__
    storage_map=getattr(self.fn, 'storage_map', None))
  File "/usr/local/lib/python2.7/dist-packages/theano/gof/link.py", line 314, in raise_with_op
    reraise(exc_type, exc_value, exc_trace)
  File "/usr/local/lib/python2.7/dist-packages/theano/compile/function_module.py", line 859, in __call__
    outputs = self.fn()
  File "/usr/local/lib/python2.7/dist-packages/theano/gof/op.py", line 912, in rval
    r = p(n, [x[0] for x in i], o)
  File "Whosebug.py", line 17, in perform
    f_to_int = theano.function([x], x)
  File "/usr/local/lib/python2.7/dist-packages/theano/compile/function.py", line 320, in function
    output_keys=output_keys)
  File "/usr/local/lib/python2.7/dist-packages/theano/compile/pfunc.py", line 390, in pfunc
    for p in params]
  File "/usr/local/lib/python2.7/dist-packages/theano/compile/pfunc.py", line 489, in _pfunc_param_to_in
    raise TypeError('Unknown parameter type: %s' % type(param))
TypeError: Unknown parameter type: <type 'numpy.ndarray'>
Apply node that caused the error: IntOp(x)
Toposort index: 0
Inputs types: [TensorType(float64, matrix)]
Inputs shapes: [(2, 2)]
Inputs strides: [(16, 8)]
Inputs values: [array([[ 2.,  4.],
       [ 6.,  8.]])]
Outputs clients: [['output']]

Backtrace when the node is created(use Theano flag traceback.limit=N to make it longer):
  File "Whosebug.py", line 30, in <module>
    y = intOp(x)
  File "/usr/local/lib/python2.7/dist-packages/theano/gof/op.py", line 611, in __call__
    node = self.make_node(*inputs, **kwargs)
  File "Whosebug.py", line 11, in make_node
    return theano.Apply(self, [x], [x.type()])

HINT: Use the Theano flag 'exception_verbosity=high' for a debugprint and storage map footprint of this apply node.

我对此感到惊讶,尤其是 TypeError,因为我认为我已经将 output_storage 变量转换为张量,但它似乎在这里相信它仍然是一个 ndarray。

我发现了你的问题,因为我正试图在 PyMC3 中构建一个代表一般点过程(Hawkes、Cox、Poisson 等)的随机变量,并且似然函数具有积分。我真的希望能够使用 Hamiltonian Monte Carlo 或 NUTS 采样器,所以我需要关于时间的积分是可微的。

从您的尝试开始,我制作了一个 integrateOut theano Op,它似乎可以正确处理我需要的行为。我已经在几个不同的输入上对其进行了测试(目前还没有在我的统计模型上进行测试,但它看起来很有希望!)。我完全是 theano n00b,所以请原谅我的愚蠢行为。如果有人有任何反馈,我将不胜感激。不确定这正是您要查找的内容,但这是我的解决方案(底部和文档字符串中的示例)。 *编辑:简化了一些解决方法的残余。

import theano
import theano.tensor as T
from scipy.integrate import quad

class integrateOut(theano.Op):
    """
    Integrate out a variable from an expression, computing
    the definite integral w.r.t. the variable specified
    !!! Only implemented in this for scalars !!!


    Parameters
    ----------
    f : scalar
        input 'function' to integrate
    t : scalar
        the variable to integrate out
    t0: float
        lower integration limit
    tf: float
        upper integration limit

    Returns
    -------
    scalar
        a new scalar with the 't' integrated out

    Notes
    -----

    usage of this looks like:
    x = T.dscalar('x')
    y = T.dscalar('y')
    t = T.dscalar('t')

    z = (x**2 + y**2)*t

    # integrate z w.r.t. t as a function of (x,y)
    intZ = integrateOut(z,t,0.0,5.0)(x,y)
    gradIntZ = T.grad(intZ,[x,y])

    funcIntZ = theano.function([x,y],intZ)
    funcGradIntZ = theano.function([x,y],gradIntZ)

    """
    def __init__(self,f,t,t0,tf,*args,**kwargs):
        super(integrateOut,self).__init__()
        self.f = f
        self.t = t
        self.t0 = t0
        self.tf = tf

    def make_node(self,*inputs):
        self.fvars=list(inputs)
        # This will fail when taking the gradient... don't be concerned
        try:
            self.gradF = T.grad(self.f,self.fvars)
        except:
            self.gradF = None
        return theano.Apply(self,self.fvars,[T.dscalar().type()])

    def perform(self,node, inputs, output_storage):
        # Everything else is an argument to the quad function
        args = tuple(inputs)
        # create a function to evaluate the integral
        f = theano.function([self.t]+self.fvars,self.f)
        # actually compute the integral
        output_storage[0][0] = quad(f,self.t0,self.tf,args=args)[0]

    def grad(self,inputs,grads):
        return [integrateOut(g,self.t,self.t0,self.tf)(*inputs)*grads[0] \
            for g in self.gradF]

x = T.dscalar('x')
y = T.dscalar('y')
t = T.dscalar('t')

z = (x**2+y**2)*t

intZ = integrateOut(z,t,0,1)(x,y)
gradIntZ = T.grad(intZ,[x,y])
funcIntZ = theano.function([x,y],intZ)
funcGradIntZ = theano.function([x,y],gradIntZ)
print funcIntZ(2,2)
print funcGradIntZ(2,2)

SymPy 证明比预期的要难,但与此同时,如果有人发现它有用,我还将指出如何修改此 Op 以允许更改最终时间点而无需创建新的 Op。如果您有一个点过程,或者如果您的时间测量不确定,这会很有用。

class integrateOut2(theano.Op):
    def __init__(self, f, int_var, *args,**kwargs):
        super(integrateOut2,self).__init__()
        self.f = f
        self.int_var = int_var

    def make_node(self, *inputs):
        tmax = inputs[0]
        self.fvars=list(inputs[1:])

        return theano.Apply(self, [tmax]+self.fvars, [T.dscalar().type()])

    def perform(self, node, inputs, output_storage):
        # Everything else is an argument to the quad function
        tmax = inputs[0]
        args = tuple(inputs[1:])

        # create a function to evaluate the integral
        f = theano.function([self.int_var]+self.fvars, self.f)

        # actually compute the integral
        output_storage[0][0] = quad(f, 0., tmax, args=args)[0]

    def grad(self, inputs, grads):
        tmax = inputs[0]
        param_grads = T.grad(self.f, self.fvars)

        ## Recall fundamental theorem of calculus
        ## d/dt \int^{t}_{0}f(x)dx = f(t)
        ## So sub in t_max to the graph
        FTC_grad = theano.clone(self.f, {self.int_var: tmax})

        grad_list = [FTC_grad*grads[0]] + \
                    [integrateOut2(grad_fn, self.int_var)(*inputs)*grads[0] \
                     for grad_fn in param_grads]

        return grad_list

我总是使用以下代码,其中我从 µ = 1 和 σ 2 = 2.25 的正态分布生成 B = 10000 个 n = 30 个观察样本。对于每个样本,参数 µ 和 σ 被估计并存储在矩阵中。希望对您有所帮助。

loglik <- function(p,z){
sum(dnorm(z,mean=p[1],sd=p[2],log=TRUE))
}
set.seed(45)
n <- 30
x <- rnorm(n,mean=1,sd=1.5)
optim(c(mu=0,sd=1),loglik,control=list(fnscale=-1),z=x)

B <- 10000
bootstrap.results <- matrix(NA,nrow=B,ncol=3)
colnames(bootstrap.results) <- c("mu","sigma","convergence")
for (b in 1:B){
sample.b <- rnorm(n,mean=1,sd=1.5)
m.b <- optim(c(mu=0,sd=1),loglik,control=list(fnscale=-1),z=sample.b)
bootstrap.results[b,] <- c(m.b$par,m.b$convergence)
}

还可以获得 λ 的 ML 估计值,并使用 bootstrap 来估计估计值的偏差和标准误差。首先计算 λ 的 MLE 然后,我们通过非参数 bootstrap.

估计 λ^ 的偏差和标准误差
B <- 9999
lambda.B <- rep(NA,B)
n <- length(w.time)
for (b in 1:B){
b.sample <- sample(1:n,n,replace=TRUE)
lambda.B[b] <- 1/mean(w.time[b.sample])
}
bias <- mean(lambda.B-m$estimate)
sd(lambda.B)

在第二部分中,我们计算了平均故障间隔时间的 95% 置信区间。

n <- length(w.time)
m <- mean(w.time)
se <- sd(w.time)/sqrt(n)
interval.1 <- m + se * qnorm(c(0.025,0.975))
interval.1

但我们也可以假设数据来自指数分布。在这种情况下,我们有 varX¯ = 1/(nλ^2) = θ^{2}/n 可以通过 X¯^{2}/n.

来估计
sd.m <- sqrt(m^2/n)
interval.2 <- m + sd.m * qnorm(c(0.025,0.975))
interval.2

我们还可以通过 boostrap 程序估计 ^θ 的标准误差。我们使用非参数bootstrap,即我们从原始样本中抽取有放回的

B <- 9999
m.star <- rep(NA,B)
for (b in 1:B){
m.star[b] <- mean(sample(w.time,replace=TRUE))
}
sd.m.star <- sd(m.star)
interval.3 <- m + sd.m.star * qnorm(c(0.025,0.975))
interval.3
An interval not based on the assumption of normality of ˆθ is obtained by the percentile method:

interval.4 <- quantile(m.star, probs=c(0.025,0.975))
interval.4