在 Numba 中使用 Scipy 个例程
Using Scipy routines with Numba
我正在编写一个程序,其中 Scipy 在某些点使用 CubicSpline 例程,
由于使用了 Scipy 例程,我无法在我的整个程序中使用 Numba @jit。
我最近遇到了@overload 功能,我想知道它是否可以这样使用,
from numba.extending import overload
from numba import jit
from scipy.interpolate import CubicSpline
import numpy as np
x = np.arange(10)
y = np.sin(x)
xs = np.arange(-0.5, 9.6, 0.1)
def Spline_interp(xs,x,y):
cs = CubicSpline(x, y)
ds = cs(xs)
return ds
@overload(Spline_interp)
def jit_Spline_interp(xs,x,y):
ds = Spline_interp(xs,x,y)
def jit_Spline_interp_impl(xs,x, y):
return ds
return jit_Spline_interp_impl
@jit(nopython=True)
def main():
# other codes compatible with @njit
ds = Spline_interp(xs,x,y)
# other codes compatible with @njit
return ds
print(main())
如果我对@overload 功能的理解有误,请指正我,以及将此类 Scipy 库与 Numba 一起使用的可能解决方案是什么。
您需要回退到 object-mode(在本地,如 @max9111 建议的那样),或者您自己在 Numba 中实现 CubicSpline
函数。
据我所知,重载装饰器 "only" 让编译器意识到,如果遇到重载函数,它可以使用与 Numba 兼容的实现。它不会神奇地将函数转换为与 Numba 兼容。
有一个包向 Numba 公开了一些 Scipy 功能,但这似乎还很早,目前只包含一些 scipy.special 功能。
这是我的解决方案的转贴,发布在 numba discourse https://numba.discourse.group/t/call-scipy-splev-routine-in-numba-jitted-function/1122/7。
我最初接受了 @max9111 使用 objmode 的建议。它给出了一个临时修复。但是,由于代码对性能至关重要,我最终编写了 scipy 的 'interpolate.splev' 样条插值子例程的 numba 版本。
import numpy as np
import numba
from scipy import interpolate
import matplotlib.pyplot as plt
import time
# Custom wrap of scipy's splrep
def custom_splrep(x, y, k=3):
"""
Custom wrap of scipy's splrep for calculating spline coefficients,
which also check if the data is equispaced.
"""
# Check if x is equispaced
x_diff = np.diff(x)
equi_spaced = all(np.round(x_diff,5) == np.round(x_diff[0],5))
dx = x_diff[0]
# Calculate knots & coefficients (cubic spline by default)
t,c,k = interpolate.splrep(x,y, k=k)
return (t,c,k,equi_spaced,dx)
# Numba accelerated implementation of scipy's splev
@numba.njit(cache=True)
def numba_splev(x, coeff):
"""
Custom implementation of scipy's splev for spline interpolation,
with additional section for faster search of knot interval, if knots are equispaced.
Spline is extrapolated from the end spans for points not in the support.
"""
t,c,k, equi_spaced, dx = coeff
t0 = t[0]
n = t.size
m = x.size
k1 = k+1
k2 = k1+1
nk1 = n - k1
l = k1
l1 = l+1
y = np.zeros(m)
h = np.zeros(20)
hh = np.zeros(19)
for i in range(m):
# fetch a new x-value arg
arg = x[i]
# search for knot interval t[l] <= arg <= t[l+1]
if(equi_spaced):
l = int((arg-t0)/dx) + k
l = min(max(l, k1), nk1)
else:
while not ((arg >= t[l-1]) or (l1 == k2)):
l1 = l
l = l-1
while not ((arg < t[l1-1]) or (l == nk1)):
l = l1
l1 = l+1
# evaluate the non-zero b-splines at arg.
h[:] = 0.0
hh[:] = 0.0
h[0] = 1.0
for j in range(k):
for ll in range(j+1):
hh[ll] = h[ll]
h[0] = 0.0
for ll in range(j+1):
li = l + ll
lj = li - j - 1
if(t[li] != t[lj]):
f = hh[ll]/(t[li]-t[lj])
h[ll] += f*(t[li]-arg)
h[ll+1] = f*(arg-t[lj])
else:
h[ll+1] = 0.0
break
sp = 0.0
ll = l - 1 - k1
for j in range(k1):
ll += 1
sp += c[ll]*h[j]
y[i] = sp
return y
######################### Testing and comparison #############################
# Generate a data set for interpolation
x, dx = np.linspace(10,100,200, retstep=True)
y = np.sin(x)
# Calculate the cubic spline spline coeff's
coeff_1 = interpolate.splrep(x,y, k=3) # scipy's splrep
coeff_2 = custom_splrep(x,y, k=3) # Custom wrap of scipy's splrep
# Generate data for interpolation and randomize
x2 = np.linspace(0,110,10000)
np.random.shuffle(x2)
# Interpolate
y2 = interpolate.splev(x2, coeff_1) # scipy's splev
y3 = numba_splev(x2, coeff_2) # Numba accelerated implementation of scipy's splev
# Plot data
plt.plot(x,y,'--', linewidth=1.0,color='green', label='data')
plt.plot(x2,y2,'o',color='blue', markersize=2.0, label='scipy splev')
plt.plot(x2,y3,'.',color='red', markersize=1.0, label='numba splev')
plt.legend()
plt.show()
print("\nTime for random interpolations")
# Calculation time evaluation for scipy splev
t1 = time.time()
for n in range(0,10000):
y2 = interpolate.splev(x2, coeff_1)
print("scipy splev", time.time() - t1)
# Calculation time evaluation for numba splev
t1 = time.time()
for n in range(0,10000):
y2 = numba_splev(x2, coeff_2)
print("numba splev",time.time() - t1)
print("\nTime for non random interpolations")
# Generate data for interpolation without randomize
x2 = np.linspace(0,110,10000)
# Calculation time evaluation for scipy splev
t1 = time.time()
for n in range(0,10000):
y2 = interpolate.splev(x2, coeff_1)
print("scipy splev", time.time() - t1)
# Calculation time evaluation for numba splev
t1 = time.time()
for n in range(0,10000):
y2 = numba_splev(x2, coeff_2)
print("numba splev",time.time() - t1)
如果节点是等距的,上面的代码针对更快的节点搜索进行了优化。
在我的 corei7 机器上,如果插值是随机值完成的,numba 版本更快,
Scipy的splev = 0.896s
Numba splev = 0.375s
如果插值不是随机值scipy的版本更快,
Scipy 的 splev = 0.281s
Numba splev = 0.375s
参考:https://github.com/scipy/scipy/tree/v1.7.1/scipy/interpolate/fitpack,
https://github.com/dbstein/fast_splines
使用 ctypes (Numba) 包装编译函数
特别是对于更复杂的功能,重新实现 numba-compileable Python 代码中的所有内容可能需要大量工作,而且有时会更慢。以下答案将直接从共享对象或动态库调用 C-like 函数。
编译 fortran 例程
此示例将展示在 windows 上执行此操作的方法,但在其他 os 上应该是直截了当的。对于定义 ISO_C_BINDING 的可移植接口,强烈推荐。在这个答案中,我将在没有界面的情况下尝试。
dll.def
EXPORTS
SPLEV @1
编译
ifort /dll dll.def splev.f fpbspl.f /O3 /fast
直接从 Numba 调用此函数
- 看看 Fortran 例程需要什么
- 检查包装器中的每个输入(数据类型、连续性)。您只需提供一些指向 fortran 函数的指针。没有额外的安全检查。
包装器
下面的代码展示了调用这个函数的两种方式。在 Numba 中,它不是直接 possible to pass scalar by reference。您可以在堆上分配一个数组(对于小函数来说很慢),或者使用内部函数来使用堆栈数组。
import numba as nb
import numpy as np
import ctypes
lib = ctypes.cdll.LoadLibrary("splev.dll")
dble_p=ctypes.POINTER(ctypes.c_double)
int_p =ctypes.POINTER(ctypes.c_longlong)
SPLEV=lib.SPLEV
SPLEV.restype = ctypes.c_void_p
SPLEV.argtypes = (dble_p,int_p,dble_p,int_p,dble_p,dble_p,int_p,int_p,int_p)
from numba import types
from numba.extending import intrinsic
from numba.core import cgutils
@intrinsic
def val_to_ptr(typingctx, data):
def impl(context, builder, signature, args):
ptr = cgutils.alloca_once_value(builder,args[0])
return ptr
sig = types.CPointer(nb.typeof(data).instance_type)(nb.typeof(data).instance_type)
return sig, impl
@intrinsic
def ptr_to_val(typingctx, data):
def impl(context, builder, signature, args):
val = builder.load(args[0])
return val
sig = data.dtype(types.CPointer(data.dtype))
return sig, impl
#with intrinsics, temporary arrays are allocated on stack
#faster but much more relevant for functions with very low runtime
@nb.njit()
def splev_wrapped(x, coeff,e):
#There are just pointers passed to the fortran function.
#The arrays have to be contiguous!
t=np.ascontiguousarray(coeff[0])
x=np.ascontiguousarray(x)
c=coeff[1]
k=coeff[2]
y=np.empty(x.shape[0],dtype=np.float64)
n_arr=val_to_ptr(nb.int64(t.shape[0]))
k_arr=val_to_ptr(nb.int64(k))
m_arr=val_to_ptr(nb.int64(x.shape[0]))
e_arr=val_to_ptr(nb.int64(e))
ier_arr=val_to_ptr(nb.int64(0))
SPLEV(t.ctypes,n_arr,c.ctypes,k_arr,x.ctypes,
y.ctypes,m_arr,e_arr,ier_arr)
return y, ptr_to_val(ier_arr)
#without using intrinsics
@nb.njit()
def splev_wrapped_2(x, coeff,e):
#There are just pointers passed to the fortran function.
#The arrays have to be contiguous!
t=np.ascontiguousarray(coeff[0])
x=np.ascontiguousarray(x)
c=coeff[1]
k=coeff[2]
y=np.empty(x.shape[0],dtype=np.float64)
n_arr = np.empty(1, dtype=np.int64)
k_arr = np.empty(1, dtype=np.int64)
m_arr = np.empty(1, dtype=np.int64)
e_arr = np.empty(1, dtype=np.int64)
ier_arr = np.zeros(1, dtype=np.int64)
n_arr[0]=t.shape[0]
k_arr[0]=k
m_arr[0]=x.shape[0]
e_arr[0]=e
SPLEV(t.ctypes,n_arr.ctypes,c.ctypes,k_arr.ctypes,x.ctypes,
y.ctypes,m_arr.ctypes,e_arr.ctypes,ier_arr.ctypes)
return y, ier_arr[0]
我正在编写一个程序,其中 Scipy 在某些点使用 CubicSpline 例程, 由于使用了 Scipy 例程,我无法在我的整个程序中使用 Numba @jit。
我最近遇到了@overload 功能,我想知道它是否可以这样使用,
from numba.extending import overload
from numba import jit
from scipy.interpolate import CubicSpline
import numpy as np
x = np.arange(10)
y = np.sin(x)
xs = np.arange(-0.5, 9.6, 0.1)
def Spline_interp(xs,x,y):
cs = CubicSpline(x, y)
ds = cs(xs)
return ds
@overload(Spline_interp)
def jit_Spline_interp(xs,x,y):
ds = Spline_interp(xs,x,y)
def jit_Spline_interp_impl(xs,x, y):
return ds
return jit_Spline_interp_impl
@jit(nopython=True)
def main():
# other codes compatible with @njit
ds = Spline_interp(xs,x,y)
# other codes compatible with @njit
return ds
print(main())
如果我对@overload 功能的理解有误,请指正我,以及将此类 Scipy 库与 Numba 一起使用的可能解决方案是什么。
您需要回退到 object-mode(在本地,如 @max9111 建议的那样),或者您自己在 Numba 中实现 CubicSpline
函数。
据我所知,重载装饰器 "only" 让编译器意识到,如果遇到重载函数,它可以使用与 Numba 兼容的实现。它不会神奇地将函数转换为与 Numba 兼容。
有一个包向 Numba 公开了一些 Scipy 功能,但这似乎还很早,目前只包含一些 scipy.special 功能。
这是我的解决方案的转贴,发布在 numba discourse https://numba.discourse.group/t/call-scipy-splev-routine-in-numba-jitted-function/1122/7。
我最初接受了 @max9111 使用 objmode 的建议。它给出了一个临时修复。但是,由于代码对性能至关重要,我最终编写了 scipy 的 'interpolate.splev' 样条插值子例程的 numba 版本。
import numpy as np
import numba
from scipy import interpolate
import matplotlib.pyplot as plt
import time
# Custom wrap of scipy's splrep
def custom_splrep(x, y, k=3):
"""
Custom wrap of scipy's splrep for calculating spline coefficients,
which also check if the data is equispaced.
"""
# Check if x is equispaced
x_diff = np.diff(x)
equi_spaced = all(np.round(x_diff,5) == np.round(x_diff[0],5))
dx = x_diff[0]
# Calculate knots & coefficients (cubic spline by default)
t,c,k = interpolate.splrep(x,y, k=k)
return (t,c,k,equi_spaced,dx)
# Numba accelerated implementation of scipy's splev
@numba.njit(cache=True)
def numba_splev(x, coeff):
"""
Custom implementation of scipy's splev for spline interpolation,
with additional section for faster search of knot interval, if knots are equispaced.
Spline is extrapolated from the end spans for points not in the support.
"""
t,c,k, equi_spaced, dx = coeff
t0 = t[0]
n = t.size
m = x.size
k1 = k+1
k2 = k1+1
nk1 = n - k1
l = k1
l1 = l+1
y = np.zeros(m)
h = np.zeros(20)
hh = np.zeros(19)
for i in range(m):
# fetch a new x-value arg
arg = x[i]
# search for knot interval t[l] <= arg <= t[l+1]
if(equi_spaced):
l = int((arg-t0)/dx) + k
l = min(max(l, k1), nk1)
else:
while not ((arg >= t[l-1]) or (l1 == k2)):
l1 = l
l = l-1
while not ((arg < t[l1-1]) or (l == nk1)):
l = l1
l1 = l+1
# evaluate the non-zero b-splines at arg.
h[:] = 0.0
hh[:] = 0.0
h[0] = 1.0
for j in range(k):
for ll in range(j+1):
hh[ll] = h[ll]
h[0] = 0.0
for ll in range(j+1):
li = l + ll
lj = li - j - 1
if(t[li] != t[lj]):
f = hh[ll]/(t[li]-t[lj])
h[ll] += f*(t[li]-arg)
h[ll+1] = f*(arg-t[lj])
else:
h[ll+1] = 0.0
break
sp = 0.0
ll = l - 1 - k1
for j in range(k1):
ll += 1
sp += c[ll]*h[j]
y[i] = sp
return y
######################### Testing and comparison #############################
# Generate a data set for interpolation
x, dx = np.linspace(10,100,200, retstep=True)
y = np.sin(x)
# Calculate the cubic spline spline coeff's
coeff_1 = interpolate.splrep(x,y, k=3) # scipy's splrep
coeff_2 = custom_splrep(x,y, k=3) # Custom wrap of scipy's splrep
# Generate data for interpolation and randomize
x2 = np.linspace(0,110,10000)
np.random.shuffle(x2)
# Interpolate
y2 = interpolate.splev(x2, coeff_1) # scipy's splev
y3 = numba_splev(x2, coeff_2) # Numba accelerated implementation of scipy's splev
# Plot data
plt.plot(x,y,'--', linewidth=1.0,color='green', label='data')
plt.plot(x2,y2,'o',color='blue', markersize=2.0, label='scipy splev')
plt.plot(x2,y3,'.',color='red', markersize=1.0, label='numba splev')
plt.legend()
plt.show()
print("\nTime for random interpolations")
# Calculation time evaluation for scipy splev
t1 = time.time()
for n in range(0,10000):
y2 = interpolate.splev(x2, coeff_1)
print("scipy splev", time.time() - t1)
# Calculation time evaluation for numba splev
t1 = time.time()
for n in range(0,10000):
y2 = numba_splev(x2, coeff_2)
print("numba splev",time.time() - t1)
print("\nTime for non random interpolations")
# Generate data for interpolation without randomize
x2 = np.linspace(0,110,10000)
# Calculation time evaluation for scipy splev
t1 = time.time()
for n in range(0,10000):
y2 = interpolate.splev(x2, coeff_1)
print("scipy splev", time.time() - t1)
# Calculation time evaluation for numba splev
t1 = time.time()
for n in range(0,10000):
y2 = numba_splev(x2, coeff_2)
print("numba splev",time.time() - t1)
如果节点是等距的,上面的代码针对更快的节点搜索进行了优化。 在我的 corei7 机器上,如果插值是随机值完成的,numba 版本更快,
Scipy的splev = 0.896s Numba splev = 0.375s
如果插值不是随机值scipy的版本更快,
Scipy 的 splev = 0.281s Numba splev = 0.375s
参考:https://github.com/scipy/scipy/tree/v1.7.1/scipy/interpolate/fitpack, https://github.com/dbstein/fast_splines
使用 ctypes (Numba) 包装编译函数
特别是对于更复杂的功能,重新实现 numba-compileable Python 代码中的所有内容可能需要大量工作,而且有时会更慢。以下答案将直接从共享对象或动态库调用 C-like 函数。
编译 fortran 例程
此示例将展示在 windows 上执行此操作的方法,但在其他 os 上应该是直截了当的。对于定义 ISO_C_BINDING 的可移植接口,强烈推荐。在这个答案中,我将在没有界面的情况下尝试。
dll.def
EXPORTS
SPLEV @1
编译
ifort /dll dll.def splev.f fpbspl.f /O3 /fast
直接从 Numba 调用此函数
- 看看 Fortran 例程需要什么
- 检查包装器中的每个输入(数据类型、连续性)。您只需提供一些指向 fortran 函数的指针。没有额外的安全检查。
包装器
下面的代码展示了调用这个函数的两种方式。在 Numba 中,它不是直接 possible to pass scalar by reference。您可以在堆上分配一个数组(对于小函数来说很慢),或者使用内部函数来使用堆栈数组。
import numba as nb
import numpy as np
import ctypes
lib = ctypes.cdll.LoadLibrary("splev.dll")
dble_p=ctypes.POINTER(ctypes.c_double)
int_p =ctypes.POINTER(ctypes.c_longlong)
SPLEV=lib.SPLEV
SPLEV.restype = ctypes.c_void_p
SPLEV.argtypes = (dble_p,int_p,dble_p,int_p,dble_p,dble_p,int_p,int_p,int_p)
from numba import types
from numba.extending import intrinsic
from numba.core import cgutils
@intrinsic
def val_to_ptr(typingctx, data):
def impl(context, builder, signature, args):
ptr = cgutils.alloca_once_value(builder,args[0])
return ptr
sig = types.CPointer(nb.typeof(data).instance_type)(nb.typeof(data).instance_type)
return sig, impl
@intrinsic
def ptr_to_val(typingctx, data):
def impl(context, builder, signature, args):
val = builder.load(args[0])
return val
sig = data.dtype(types.CPointer(data.dtype))
return sig, impl
#with intrinsics, temporary arrays are allocated on stack
#faster but much more relevant for functions with very low runtime
@nb.njit()
def splev_wrapped(x, coeff,e):
#There are just pointers passed to the fortran function.
#The arrays have to be contiguous!
t=np.ascontiguousarray(coeff[0])
x=np.ascontiguousarray(x)
c=coeff[1]
k=coeff[2]
y=np.empty(x.shape[0],dtype=np.float64)
n_arr=val_to_ptr(nb.int64(t.shape[0]))
k_arr=val_to_ptr(nb.int64(k))
m_arr=val_to_ptr(nb.int64(x.shape[0]))
e_arr=val_to_ptr(nb.int64(e))
ier_arr=val_to_ptr(nb.int64(0))
SPLEV(t.ctypes,n_arr,c.ctypes,k_arr,x.ctypes,
y.ctypes,m_arr,e_arr,ier_arr)
return y, ptr_to_val(ier_arr)
#without using intrinsics
@nb.njit()
def splev_wrapped_2(x, coeff,e):
#There are just pointers passed to the fortran function.
#The arrays have to be contiguous!
t=np.ascontiguousarray(coeff[0])
x=np.ascontiguousarray(x)
c=coeff[1]
k=coeff[2]
y=np.empty(x.shape[0],dtype=np.float64)
n_arr = np.empty(1, dtype=np.int64)
k_arr = np.empty(1, dtype=np.int64)
m_arr = np.empty(1, dtype=np.int64)
e_arr = np.empty(1, dtype=np.int64)
ier_arr = np.zeros(1, dtype=np.int64)
n_arr[0]=t.shape[0]
k_arr[0]=k
m_arr[0]=x.shape[0]
e_arr[0]=e
SPLEV(t.ctypes,n_arr.ctypes,c.ctypes,k_arr.ctypes,x.ctypes,
y.ctypes,m_arr.ctypes,e_arr.ctypes,ier_arr.ctypes)
return y, ier_arr[0]