如何减少 2D 连接域上积分的积分时间

How to reduce integration time for integration over 2D connected domains

我需要在简单连接的域(大部分时间是凸的)上计算许多二维积分。我正在使用 python 函数 scipy.integrate.nquad 来执行此集成。但是,与矩形域上的积分相比,此操作所需的时间要长得多。有没有可能更快的实施?

这是一个例子;我首先在圆形域(使用函数内部的约束)和矩形域(nquad 函数的默认域)上对常量函数进行积分。

from scipy import integrate
import time

def circular(x,y,a):
  if x**2 + y**2 < a**2/4:
    return 1 
  else:
    return 0

def rectangular(x,y,a):
  return 1

a = 4
start = time.time()
result = integrate.nquad(circular, [[-a/2, a/2],[-a/2, a/2]], args=(a,))
now = time.time()
print(now-start)

start = time.time()
result = integrate.nquad(rectangular, [[-a/2, a/2],[-a/2, a/2]], args=(a,))
now = time.time()
print(now-start)

矩形域只需要 0.00029 秒,而圆形域需要 2.07061 秒才能完成。

循环积分也给出以下警告:

IntegrationWarning: The maximum number of subdivisions (50) has been achieved.
If increasing the limit yields no improvement it is advised to analyze 
the integrand in order to determine the difficulties.  If the position of a 
local difficulty can be determined (singularity, discontinuity) one will 
probably gain from splitting up the interval and calling the integrator 
on the subranges.  Perhaps a special-purpose integrator should be used.
**opt)

加快计算速度的一种方法是使用 numba,这是 Python 的即时编译器。

@jit装饰器

Numba 提供了一个 @jit decorator 来编译一些 Python 代码并输出优化的机器代码,可以 运行 在几个 CPU 上并行。 jitting 被积函数只需要很少的努力,并且可以节省一些时间,因为代码被优化为 运行 更快。人们甚至不必担心类型,Numba 会在后台完成所有这些工作。

from scipy import integrate
from numba import jit

@jit
def circular_jit(x, y, a):
    if x**2 + y**2 < a**2 / 4:
        return 1 
    else:
        return 0

a = 4
result = integrate.nquad(circular_jit, [[-a/2, a/2],[-a/2, a/2]], args=(a,))

这 运行 确实更快,在我的机器上计时时,我得到:

 Original circular function: 1.599048376083374
 Jitted circular function: 0.8280022144317627

计算时间减少了约 50%。

Scipy的LowLevelCallable

由于语言的性质,Python 中的函数调用非常耗时。与 C 等编译语言相比,开销有时会使 Python 代码变慢。

为了缓解这种情况,Scipy 提供了 LowLevelCallable class 可用于提供对低级编译回调函数的访问。通过这种机制,Python的函数调用开销被绕过,可以进一步节省时间。

请注意,在 nquad 的情况下,传递给 LowerLevelCallablecfunc 的签名必须是以下之一:

double func(int n, double *xx)
double func(int n, double *xx, void *user_data)

其中 int 是参数的数量,参数的值在第二个参数中。 user_data 用于需要上下文操作的回调。

因此,我们可以稍微更改 Python 中的循环函数签名以使其兼容。

from scipy import integrate, LowLevelCallable
from numba import cfunc
from numba.types import intc, CPointer, float64


@cfunc(float64(intc, CPointer(float64)))
def circular_cfunc(n, args):
    x, y, a = (args[0], args[1], args[2]) # Cannot do `(args[i] for i in range(n))` as `yield` is not supported
    if x**2 + y**2 < a**2/4:
        return 1 
    else:
        return 0

circular_LLC = LowLevelCallable(circular_cfunc.ctypes)

a = 4
result = integrate.nquad(circular_LLC, [[-a/2, a/2],[-a/2, a/2]], args=(a,))

用这个方法我得到

LowLevelCallable circular function: 0.07962369918823242

与原始版本相比减少了 95%,与函数的 jitted 版本相比减少了 90%。

定制装饰器

为了使代码更整洁并保持被积函数签名的灵活性,可以创建一个定制的装饰器函数。它将 jit 被积函数并将其包装到一个 LowLevelCallable 对象中,然后可以与 nquad.

一起使用
from scipy import integrate, LowLevelCallable
from numba import cfunc, jit
from numba.types import intc, CPointer, float64

def jit_integrand_function(integrand_function):
    jitted_function = jit(integrand_function, nopython=True)

    @cfunc(float64(intc, CPointer(float64)))
    def wrapped(n, xx):
        return jitted_function(xx[0], xx[1], xx[2])
    return LowLevelCallable(wrapped.ctypes)


@jit_integrand_function
def circular(x, y, a):
    if x**2 + y**2 < a**2 / 4:
        return 1
    else:
        return 0

a = 4
result = integrate.nquad(circular, [[-a/2, a/2],[-a/2, a/2]], args=(a,))

任意数量的参数

如果参数个数未知,那么我们可以使用 Numba 提供的便捷 carray functionCPointer(float64) 转换为 Numpy 数组。

import numpy as np
from scipy import integrate, LowLevelCallable
from numba import cfunc, carray, jit
from numba.types import intc, CPointer, float64

def jit_integrand_function(integrand_function):
    jitted_function = jit(integrand_function, nopython=True)

    @cfunc(float64(intc, CPointer(float64)))
    def wrapped(n, xx):
        ar = carray(xx, n)
        return jitted_function(ar[0], ar[1], ar[2:])
    return LowLevelCallable(wrapped.ctypes)


@jit_integrand_function
def circular(x, y, a):
    if x**2 + y**2 < a[-1]**2 / 4:
        return 1
    else:
        return 0

ar = np.array([1, 2, 3, 4])
a = ar[-1]
result = integrate.nquad(circular, [[-a/2, a/2],[-a/2, a/2]], args=ar)