使用 Scipy 加速许多粒子的路径计算
Speeding up path calculation of many particles using Scipy
我正在尝试使用 Python 计算许多粒子位置的演变。最终,我将在大约 100000 个时间步长内计算出许多粒子(大约 10000 个)。由于我正在不惜一切代价避免使用 Fortran,因此我正在努力加快这个过程。
我要解的方程是
d X_i/dt = u
d Y_i/dt = v
所以理想情况下,我会使用二维数组:一个在粒子之间变化,另一个在 x
和 y
之间变化。所以如果我有 100 个粒子,我就会有一个 100x2
数组。
问题的一部分是因为scipy.integrate.odeint
只需要一维数组,所以我必须展平我的初始条件,在我的导数函数(RHS_im
)中拆分它,然后展平输出的时候又是它,速度很慢(这占了RHS_im
调用的20%左右)。
我可以用丑陋的方式做到这一点。这是我提出的 MWE
import numpy as np
from scipy.interpolate import RegularGridInterpolator
from scipy.integrate import odeint
x=np.arange(2.5, 500, 5)
y=np.arange(2.5, 500, 5)
X, Y = np.meshgrid(x, y, indexing='xy')
U=np.full_like(X, -0.1)
V=0.2*np.sin(2*np.pi*X/200)
tend=100
dt=1
tsteps=np.arange(0, tend, dt)
#------
# Create interpolation
U_int = RegularGridInterpolator((x, y), U.T)
V_int = RegularGridInterpolator((x, y), V.T)
#------
#------
# Initial conditions
x0=np.linspace(200,300,5)
y0=np.linspace(200,300,5)
#------
#------
# Calculation for many
def RHS_im(XY, t):
X, Y = np.split(XY, 2, axis=0)
pts=np.array([X,Y]).T
return np.concatenate([ U_int(pts), V_int(pts) ])
XY0 = np.concatenate([x0, y0])
XY, info = odeint(RHS_im, XY0, tsteps[:-1], args=(), hmax=dt, hmin=dt, atol=0.1, full_output=True)
X, Y = np.split(XY, 2, axis=1)
有没有办法避免拆分过程?
此外,虽然我选择了odeint
整合这个系统,但我并不承诺这个选择。如果有另一个函数可以更快地执行此操作(即使使用简单的欧拉方案),我会很容易地进行更改。我只是没有找到任何。 (插值方案也是如此,大约是 RHS_im
所用时间的 80%)。
编辑
稍微改进了拆分过程(通过使用 np.split
),但整个程序仍然需要改进。
感谢您对 MWE 的澄清。通过将 U_int
和 V_int
组合成 UV_int
并更改 XY 的布局,我能够使 MWE 更快地达到 运行 大约 50% np.reshape可以用来代替 np.split。这两项更改都有助于就地连续访问内存。
x_grid=np.arange(2.5, 500, 5)
y_grid=np.arange(2.5, 500, 5)
x_mesh, y_mesh = np.meshgrid(x_grid, y_grid, indexing='xy')
u_mesh=(np.full_like(x_mesh, -0.1))
v_mesh=(0.2*np.sin(2*np.pi*x_mesh/200))
uv_mesh = np.stack([u_mesh.T, v_mesh.T], axis=-1)
tend=100
dt=1
tsteps=np.arange(0, tend, dt)
#------
# Create interpolation
UV_int = RegularGridInterpolator((x_grid, y_grid), uv_mesh)
#------
#------
# Initial conditions
npart = 5
x0=np.linspace(200,300,npart)
y0=np.linspace(200,300,npart)
XY_pair = np.reshape(np.column_stack([x0, y0]), (2*npart))
#-----
def RHS_im(XY, t):
return np.reshape(UV_int(np.reshape(XY, (npart, 2))), (npart*2))
XY, info = odeint(RHS_im, XY_pair, tsteps[:-1], args=(), hmax=dt, hmin=dt, atol=0.1, full_output=True)
我正在尝试使用 Python 计算许多粒子位置的演变。最终,我将在大约 100000 个时间步长内计算出许多粒子(大约 10000 个)。由于我正在不惜一切代价避免使用 Fortran,因此我正在努力加快这个过程。
我要解的方程是
d X_i/dt = u
d Y_i/dt = v
所以理想情况下,我会使用二维数组:一个在粒子之间变化,另一个在 x
和 y
之间变化。所以如果我有 100 个粒子,我就会有一个 100x2
数组。
问题的一部分是因为scipy.integrate.odeint
只需要一维数组,所以我必须展平我的初始条件,在我的导数函数(RHS_im
)中拆分它,然后展平输出的时候又是它,速度很慢(这占了RHS_im
调用的20%左右)。
我可以用丑陋的方式做到这一点。这是我提出的 MWE
import numpy as np
from scipy.interpolate import RegularGridInterpolator
from scipy.integrate import odeint
x=np.arange(2.5, 500, 5)
y=np.arange(2.5, 500, 5)
X, Y = np.meshgrid(x, y, indexing='xy')
U=np.full_like(X, -0.1)
V=0.2*np.sin(2*np.pi*X/200)
tend=100
dt=1
tsteps=np.arange(0, tend, dt)
#------
# Create interpolation
U_int = RegularGridInterpolator((x, y), U.T)
V_int = RegularGridInterpolator((x, y), V.T)
#------
#------
# Initial conditions
x0=np.linspace(200,300,5)
y0=np.linspace(200,300,5)
#------
#------
# Calculation for many
def RHS_im(XY, t):
X, Y = np.split(XY, 2, axis=0)
pts=np.array([X,Y]).T
return np.concatenate([ U_int(pts), V_int(pts) ])
XY0 = np.concatenate([x0, y0])
XY, info = odeint(RHS_im, XY0, tsteps[:-1], args=(), hmax=dt, hmin=dt, atol=0.1, full_output=True)
X, Y = np.split(XY, 2, axis=1)
有没有办法避免拆分过程?
此外,虽然我选择了odeint
整合这个系统,但我并不承诺这个选择。如果有另一个函数可以更快地执行此操作(即使使用简单的欧拉方案),我会很容易地进行更改。我只是没有找到任何。 (插值方案也是如此,大约是 RHS_im
所用时间的 80%)。
编辑
稍微改进了拆分过程(通过使用 np.split
),但整个程序仍然需要改进。
感谢您对 MWE 的澄清。通过将 U_int
和 V_int
组合成 UV_int
并更改 XY 的布局,我能够使 MWE 更快地达到 运行 大约 50% np.reshape可以用来代替 np.split。这两项更改都有助于就地连续访问内存。
x_grid=np.arange(2.5, 500, 5)
y_grid=np.arange(2.5, 500, 5)
x_mesh, y_mesh = np.meshgrid(x_grid, y_grid, indexing='xy')
u_mesh=(np.full_like(x_mesh, -0.1))
v_mesh=(0.2*np.sin(2*np.pi*x_mesh/200))
uv_mesh = np.stack([u_mesh.T, v_mesh.T], axis=-1)
tend=100
dt=1
tsteps=np.arange(0, tend, dt)
#------
# Create interpolation
UV_int = RegularGridInterpolator((x_grid, y_grid), uv_mesh)
#------
#------
# Initial conditions
npart = 5
x0=np.linspace(200,300,npart)
y0=np.linspace(200,300,npart)
XY_pair = np.reshape(np.column_stack([x0, y0]), (2*npart))
#-----
def RHS_im(XY, t):
return np.reshape(UV_int(np.reshape(XY, (npart, 2))), (npart*2))
XY, info = odeint(RHS_im, XY_pair, tsteps[:-1], args=(), hmax=dt, hmin=dt, atol=0.1, full_output=True)