如何在多进程共享数组中从 lmfit 传递 ModelResult class?

How to pass a ModelResult class from lmfit in a multiprocess shared array?

我在不同的进程中使用 lmfit 时遇到一些问题,以使我的代码更快。如何定义一些包含每次拟合结果的共享数组?

我有一个数据立方体,位置为 a、b、x 和 f(x)。我在 lmfit 中制作了一个模型,效果很好,并调整 f(x) 一点,返回一些参数。 Lmfit returns a class called ModelResult() 包含所有这些参数和一些有用的额外数据。所以,我需要 运行 这个适合每个 a 和 b,然后用这个参数和可能的额外数据制作一个立方体。我可以 运行 以线性方式(无需并行化)但我有超过 1000 个点并且模型很复杂,所以需要超过 15000 秒。

当我使用 multiprocessing 库时,我的问题就开始了。我需要在每个进程之间共享数据,所以当一个进程完成时,锁定变量并将结果存储在里面,然后解锁变量。多处理库有 Value()Array() 来做我需要的。我打算使用数组,并对变量进行一些更改以从 a 和 b 传递到 c,其中 c 是 a*b 的范围。但是我无法定义一个数组来为每个 c.

保留 ModelResult()

这里是代码:

import multiprocessing as mp
import numpy as np
import time 
from lmfit import Model
from numpy import sqrt, exp, pi

#Set time zero
start_time = time.time()

#Example of functions to fit
def gaussian(x, amp, cen, wid):
    """1-d gaussian: gaussian(x, amp, cen, wid)"""
    return (amp / (sqrt(2*pi) * wid)) * exp(-(x-cen)**2 / (2*wid**2))

def linear(x, slope, intercept):
    """a linear function"""
    return slope*x + intercept

#Function to fit every point
def fit_point(a,b,data_cube,x,pars,mod):
        pos=int(b+(a*10))
        data_point = np.array(data_cube[:,a,b])
        #print(pos,mp.current_process().name)
        error_point= np.array((data_point*0)+0.002) #Example error
        res_point=mod.fit(data_point, pars, weights=1./error_point, x=x)
        print(res_point.fit_report())
        cube_res[pos]=res_point
        return #cube_res[:]

#Invented some data
x=np.arange(10)
data_cube=np.random.rand(10, 10, 10)

#Example of a model with 2 gaussians and a line
mod = Model(linear, prefix='l_')+Model(gaussian, prefix='g1_')+Model(gaussian, prefix='g2_')
pars= mod.make_params()
pars['g1_amp'].set(0.5)
pars['g2_amp'].set(0.5)
pars['g1_cen'].set(2)
pars['g2_cen'].set(3)
pars['g1_wid'].set(0.5)
pars['g2_wid'].set(0.5)
pars['l_slope'].set(1)
pars['l_intercept'].set(1)

#Definition of the shared Array #Where I think there is the problem!
cube_res = mp.Array('u', 100)

#Definition of the process and starts
processes = []
for a in np.arange(0,10):
    for b in np.arange(0,10):
        process = mp.Process(target=fit_point, args=(a,b,data_cube,x,pars,mod))
        process.start()
        processes.append(process)
for process in processes:
    process.join()

print('Time count')
print("--- %s seconds ---" % (time.time() - start_time))    

#Intent to print some results
#print(cube_res[20].fit_report)

#Final, to recover a,b
#final_cube_res = np.reshape(cube_res, (100,100))

错误是:

TypeError: unicode string expected instead of ModelResult instance

这是因为我定义了mp.Array('u', 100),其中'u'是unicode,100是范围。

我不知道如何定义Array来保存里面的ModelResult。

感谢阅读!

错误消息告诉您不能将 lmfit.ModelResult 放入字符串中。事实上,您的

cube_res = mp.Array('u', 100)

表示 cube_res 是一个包含 100 个 unicode 字符的数组。也就是说,每个结果都需要 1 个字符长。我认为您想要的是使用 multiprocessing.Manger().dict 来保存结果。

cube_res = mp.Manager().dict()

这将允许您使用 (x, y) 位置作为键,然后您可以将 ModelResult 作为值....

...但是:你不能直接保存ModelResult,因为多进程之间共享的数据需要pickle,一般复杂的对象,尤其是有方法的对象,不容易酸洗。好消息是 lmfit.ModelResult 有一个 dumps() 方法,您可以使用该方法将该对象转换为可 picklable json 字符串。那么您的代码将使用

    res_point=mod.fit(data_point, pars, weights=1./error_point, x=x)
    print(res_point.fit_report())
    cube_res[(a,b)] = res_point.dumps()
    return 

在你的 fit_point() 函数中。

我们还没有完成,因为从转储的 json 字符串中恢复 ModelResult 有点复杂(lmfit 有这方面的辅助函数,但这些假定您已经将结果写入一份文件)。在完成 joined 所有流程后,您必须执行此操作:

print('Time count')
print("--- %s seconds ---" % (time.time() - start_time))

import lmfit
# make a dummy ModelResult -- we'll overwrite everything for this
modres = lmfit.model.ModelResult(lmfit.Model(gaussian), lmfit.Parameters())

# make a dictionary of the functions you actually used (the function *names* 
# are included in the dumped string, but not the functions themselves)
funcdefs = {'gaussian': gaussian, 'linear': linear}
for pos, dumpval in cube_res.items():
    modelresult = modres.loads(dumpval, funcdefs=funcdefs)
    print('### Result for fit ', pos)
    print(modelresult.fit_report())

我认为这应该让你开始......