如何在 python 中优化多个二维粒子的均方位移?

How to optimize Mean Square Displacement for several particles in two dimensions in python?

我想计算几个粒子的均方位移,定义为:

其中i是粒子的索引,Dt是时间间隔,t是时间,vec(x)是粒子的位置两个维度。我们对所有可能的时间取平均值 t.

我已经成功地用 numpy 实现了它。请注意 pos 是具有三个轴的 np.array(particles, time, coordinate).

import numpy as np
import matplotlib.pyplot as plt
import time

#Initialize data
np.random.seed(1)
nTime = 10**4
nParticles = 3
pos = np.zeros((nParticles, nTime, 2)) #Axis: particles, times, coordinates
for t in range(1, nTime):
    pos[:, t, :] = pos[:, t-1, :] + ( np.random.random((nParticles, 2)) - 0.5)

#MSD calculation
def MSD_direct(pos):
    Dt_r = np.arange(1, pos.shape[1]-1)
    MSD = np.empty((nParticles, len(Dt_r)))
    dMSD = np.empty((nParticles,len(Dt_r)))
    for k, Dt in enumerate(Dt_r):
        SD = np.sum((pos[:, Dt:,:] - pos[:, 0:-Dt,:])**2, axis = -1)
        MSD[:,k] = np.mean( SD , axis = 1)
        dMSD[:,k] = np.std( SD, axis = 1 ) / np.sqrt(SD.shape[1])

    return Dt_r, MSD, dMSD

start_time = time.time()
Dt_r, MSD_d, dMSD_d = MSD_direct(pos)
print("MSD_direct -- Time: %s s\n" % (time.time() - start_time))

#Plots
plt.figure()
for i in range(nParticles):
    plt.plot(pos[i,:,0])    
plt.xlabel('t')
plt.ylabel('x')
plt.savefig('pos_x.png', dpi = 300)

plt.figure()
for i in range(nParticles):
    plt.plot(pos[i,:,1])    
plt.xlabel('t')
plt.ylabel('y')
plt.savefig('pos_y.png', dpi = 300)

plt.figure()
for i in range(nParticles):
    plt.fill_between(Dt_r, MSD_d[i,:]+dMSD_d[i,:], MSD_d[i,:] - dMSD_d[i,:], alpha = 0.5)
    plt.plot(Dt_r, MSD_d[i,:])
plt.xlabel('Dt')
plt.ylabel('MSD')
plt.savefig('MSD.png', dpi = 300)

输出:

MSD_direct -- Time: 7.793087720870972 s

但是,我想尽可能优化此代码仍然 Dt 的循环,我不知道如何删除它并使用 numpy 完全矢量化程序。


我还使用 numba 重写了计算,管理了 factor two 对先前代码的改进。不知是否还有可能进一步完善

import numba as nb
@nb.jit(fastmath=True,parallel=True)
def MSD_numba(pos):
    Dt_r = np.arange(1, pos.shape[1]-1)
    MSD = np.empty((nParticles, len(Dt_r)))
    dMSD = np.empty((nParticles,len(Dt_r)))
    for i in nb.prange(nParticles):  
        for Dt in Dt_r:
            SD = (pos[i, Dt:, 0] - pos[i, 0:-Dt, 0])**2 + (pos[i, Dt:, 1] - pos[i, 0:-Dt, 1])**2
            MSD[i, Dt-1] = np.mean( SD )
            dMSD[i, Dt-1] = np.std( SD ) / np.sqrt(len(SD)) 
    return Dt_r, MSD, dMSD

start_time = time.time()
Dt_r, MSD_n, dMSD_n = MSD_numba(pos)
print("MSD_numba -- Time: %s s" % (time.time() - start_time))
print("MSD_numba -- All close to MSD_direct: %r\n" %(np.allclose(MSD_n, MSD_d) )  )

输出:

MSD_numba -- Time: 4.520232915878296 s
MSD_numba -- All close to MSD_direct: True

注意:此类问题已在多个帖子中提出,但它们使用不同的定义(, Mean squared displacement, Mean square displacement for n-dim matrix python), they do not have an answer (Mean square displacement in Python), they just use one particle (, Mean square displacement of a 1d random walk in python), they use pandas (Vectorized calculation of Mean Square Displacement in Python, )等

采用使用 FFT 变换的 的答案,我设法将此计算速度提高了 两个数量级


任意 n 维数组的广义函数

此函数假定pos为任意n维数组,您只需指定时间轴和坐标(x,y)。它 returns 与 pos 中所有粒子相关的均方位移。

def MSD_fft_ax(pos, axis_time, axis_coord):
    nTime=pos.shape[axis_time]        

    S2 = np.sum (  np.fft.ifft( np.abs(np.fft.fft(pos, n=2*nTime, axis = axis_time))**2, axis = axis_time ).take(range(nTime), axis = axis_time).real, axis = axis_coord )
    
    D=np.square(pos).sum(axis=axis_coord)

    if axis_coord % pos.ndim < axis_time % pos.ndim: axis_time -= 1

    shape_t = [nTime if ax==axis_time % D.ndim else 1 for ax, s in enumerate(D.shape)]
    shape_non_t = [1 if ax==axis_time % D.ndim else s for ax, s in enumerate(D.shape)]

    D=np.append(D, np.zeros( shape_non_t ), axis = axis_time)
    S1 = ( 2 * np.sum(D, axis = axis_time).reshape(shape_non_t) - np.cumsum( np.insert(D.take(np.arange(0,nTime), axis=axis_time), 0, 0, axis = axis_time) + np.flip(D, axis = axis_time), axis = axis_time ) ).take(np.arange(0,nTime), axis = axis_time) 

    MSD = ( S1-2*S2 ) / ( nTime-np.arange(nTime).reshape(shape_t) )

    Dt_r = np.arange(1, nTime-1)
    MSD = MSD.take(Dt_r, axis = axis_time)
    return Dt_r, MSD

start_time = time.time()
Dt_r, MSD_fax = MSD_fft_ax(pos, axis_time = 1, axis_coord=-1)
print("MSD_fft_ax -- Time: %s s" % (time.time() - start_time))
print("MSD_fft_ax -- All close to MSD_direct: %r\n" %(np.allclose(MSD_fax, MSD_d) )  )

输出:

MSD_direct -- Time: 2.1434285640716553 s

MSD_numba -- Time: 1.532573938369751 s
MSD_numba -- All close to MSD_direct: True

MSD_fft_ax -- Time: 0.009054422378540039 s
MSD_fft_ax -- All close to MSD_direct: True

具有以下形状的数组的函数:(particles, time, coordinate)

为了更好地理解,我包括了 pos 是形状数组 (particles, time, coordinate):

的特殊情况
def MSD_fft(pos):
    nTime=pos.shape[1]        

    S2 = np.sum ( np.fft.ifft( np.abs(np.fft.fft(pos, n=2*nTime, axis = -2))**2, axis = -2  )[:,:nTime,:].real , axis = -1 ) / (nTime-np.arange(nTime)[None,:] )

    D=np.square(pos).sum(axis=-1)
    D=np.append(D, np.zeros((pos.shape[0], 1)), axis = -1)
    S1 = ( 2 * np.sum(D, axis = -1)[:,None] - np.cumsum( np.insert(D[:,0:-1], 0, 0, axis = -1) + np.flip(D, axis = -1), axis = -1 ) )[:,:-1] / (nTime - np.arange(nTime)[None,:] )

    MSD = S1-2*S2

    Dt_r = np.arange(1, pos.shape[1]-1)
    MSD = MSD[:,Dt_r]
    return Dt_r, MSD

start_time = time.time()
Dt_r, MSD_f = MSD_fft(pos)
print("MSD_fft -- Time: %s s" % (time.time() - start_time))
print("MSD_fft -- All close to MSD_direct: %r\n" %(np.allclose(MSD_f, MSD_d) )  )

输出:

MSD_fft -- Time: 0.007384061813354492 s
MSD_fft -- All close to MSD_direct: True