Python 的多处理:加快多组参数的 for 循环,"apply" 与 "apply_async"
Python's multiprocessing: speed up a for-loop for several sets of parameters, "apply" vs. "apply_async"
我想使用大量不同的参数组合来整合一个微分方程组,并存储属于一组特定参数的变量的最终值。因此,我实现了一个简单的 for 循环,其中创建了随机初始条件和参数组合,集成了系统并将感兴趣的值存储在各自的数组中。
由于我打算为一个相当复杂的系统(这里我只使用玩具系统进行说明)的许多参数组合执行此操作,它也可能变得僵硬,所以我想并行化模拟以加速使用 Python的“多处理”模块。
但是,当我 运行 进行模拟时,for 循环总是比其并行版本更快。到目前为止,比我发现的 for 循环更快的唯一方法是使用“apply_async”而不是“apply”。对于 10 个不同的参数组合,我得到例如以下输出(使用下面的代码):
The for loop took 0.11986207962 seconds!
[ 41.75971761 48.06034375 38.74134139 25.6022232 46.48436046
46.34952734 50.9073202 48.26035086 50.05026187 41.79483135]
Using apply took 0.180637836456 seconds!
41.7597176061
48.0603437545
38.7413413879
25.6022231983
46.4843604574
46.3495273394
50.9073202011
48.2603508573
50.0502618731
41.7948313502
Using apply_async took 0.000414133071899 seconds!
41.7597176061
48.0603437545
38.7413413879
25.6022231983
46.4843604574
46.3495273394
50.9073202011
48.2603508573
50.0502618731
41.7948313502
虽然在这个例子中,“apply”和“apply_async”的结果顺序是相同的,但一般情况下似乎并非如此。所以,我想使用“apply_async”,因为它要快得多,但在这种情况下,我不知道如何将模拟结果与我用于各自的 parameters/initial 条件相匹配模拟.
因此我的问题是:
1) 为什么在这种情况下“应用”比简单的 for 循环慢得多?
2) 当我使用“apply_async”而不是“apply”时,并行化版本变得比 for-loop 快得多,但是我如何才能将模拟结果与我的参数相匹配在各自的模拟中使用?
3) 在这种情况下,“apply”和“apply_async”的结果具有相同的顺序。这是为什么?巧合?
我的代码可以在下面找到:
from pylab import *
import multiprocessing as mp
from scipy.integrate import odeint
import time
#my system of differential equations
def myODE (yn,tvec,allpara):
(x, y, z) = yn
a, b = allpara['para']
dx = -x + a*y + x*x*y
dy = b - a*y - x*x*y
dz = x*y
return (dx, dy, dz)
#for reproducibility
seed(0)
#time settings for integration
dt = 0.01
tmax = 50
tval = arange(0,tmax,dt)
numVar = 3 #number of variables (x, y, z)
numPar = 2 #number of parameters (a, b)
numComb = 10 #number of parameter combinations
INIT = zeros((numComb,numVar)) #initial conditions will be stored here
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here
RES = zeros(numComb) #z(tmax) will be stored here
tic = time.time()
for combi in range(numComb):
INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0
PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly
allpara = {'para': PARA[combi,:]}
results = transpose(odeint(myODE, INIT[combi,:], tval, args=(allpara,))) #integrate system
RES[combi] = results[numVar - 1][-1] #store z
#INIT[combi,:] = results[:,-1] #update initial conditions
#INIT[combi,-1] = 0 #set z to 0
toc = time.time()
print 'The for loop took ', toc-tic, 'seconds!'
print RES
#function for the multi-processing part
def runMyODE(yn,tvec,allpara):
return transpose(odeint(myODE, yn, tvec, args=(allpara,)))
tic = time.time()
pool = mp.Pool(processes=4)
results = [pool.apply(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]
toc = time.time()
print 'Using apply took ', toc-tic, 'seconds!'
for sol in range(numComb):
print results[sol][2,-1] #print final value of z
tic = time.time()
resultsAsync = [pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]
toc = time.time()
print 'Using apply_async took ', toc-tic, 'seconds!'
for sol in range(numComb):
print resultsAsync[sol].get()[2,-1] #print final value of z
请注意,您的 apply_async 比 for 循环快 289 倍这一事实有点可疑!现在,您可以保证按照提交的顺序获得结果,即使这不是您想要的最大并行度。
apply_async 开始一个任务,它不会等到它完成; .get() 就是这样做的。所以这个:
tic = time.time()
resultsAsync = [pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]
toc = time.time()
这不是一个非常公平的衡量标准;你已经开始了所有的任务,但它们不一定已经完成了。
另一方面,一旦你 .get() 结果,你就知道任务已经完成并且你有了答案;所以这样做
for sol in range(numComb):
print resultsAsync[sol].get()[2,-1] #print final value of z
意味着您肯定会按顺序获得结果(因为您正在按顺序处理 ApplyResult 对象并 .get()ing 它们);但是您可能希望在结果准备就绪后立即获得结果,而不是一次一个地阻塞等待步骤。但这意味着您需要以一种或另一种方式用它们的参数标记结果。
您可以在任务完成后使用回调来保存结果,return 参数与结果一起保存,以允许完全异步 returns:
def runMyODE(yn,tvec,allpara):
return allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,)))
asyncResults = []
def saveResult(result):
asyncResults.append((result[0], result[1][2,-1]))
tic = time.time()
for combi in range(numComb):
pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]}), callback=saveResult)
pool.close()
pool.join()
toc = time.time()
print 'Using apply_async took ', toc-tic, 'seconds!'
for res in asyncResults:
print res[0], res[1]
给你更合理的时间;结果仍然几乎总是有序的,因为任务花费的时间非常相似:
Using apply took 0.0847041606903 seconds!
[ 6.02763376 5.44883183] 41.7597176061
[ 4.37587211 8.91773001] 48.0603437545
[ 7.91725038 5.2889492 ] 38.7413413879
[ 0.71036058 0.871293 ] 25.6022231983
[ 7.78156751 8.70012148] 46.4843604574
[ 4.61479362 7.80529176] 46.3495273394
[ 1.43353287 9.44668917] 50.9073202011
[ 2.64555612 7.74233689] 48.2603508573
[ 0.187898 6.17635497] 50.0502618731
[ 9.43748079 6.81820299] 41.7948313502
Using apply_async took 0.0259671211243 seconds!
[ 4.37587211 8.91773001] 48.0603437545
[ 0.71036058 0.871293 ] 25.6022231983
[ 6.02763376 5.44883183] 41.7597176061
[ 7.91725038 5.2889492 ] 38.7413413879
[ 7.78156751 8.70012148] 46.4843604574
[ 4.61479362 7.80529176] 46.3495273394
[ 1.43353287 9.44668917] 50.9073202011
[ 2.64555612 7.74233689] 48.2603508573
[ 0.187898 6.17635497] 50.0502618731
[ 9.43748079 6.81820299] 41.7948313502
请注意,除了循环应用之外,您还可以使用 map:
pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, para=PARA[combi,:]), range(numComb), callback=saveResult)
我想使用大量不同的参数组合来整合一个微分方程组,并存储属于一组特定参数的变量的最终值。因此,我实现了一个简单的 for 循环,其中创建了随机初始条件和参数组合,集成了系统并将感兴趣的值存储在各自的数组中。 由于我打算为一个相当复杂的系统(这里我只使用玩具系统进行说明)的许多参数组合执行此操作,它也可能变得僵硬,所以我想并行化模拟以加速使用 Python的“多处理”模块。
但是,当我 运行 进行模拟时,for 循环总是比其并行版本更快。到目前为止,比我发现的 for 循环更快的唯一方法是使用“apply_async”而不是“apply”。对于 10 个不同的参数组合,我得到例如以下输出(使用下面的代码):
The for loop took 0.11986207962 seconds!
[ 41.75971761 48.06034375 38.74134139 25.6022232 46.48436046
46.34952734 50.9073202 48.26035086 50.05026187 41.79483135]
Using apply took 0.180637836456 seconds!
41.7597176061
48.0603437545
38.7413413879
25.6022231983
46.4843604574
46.3495273394
50.9073202011
48.2603508573
50.0502618731
41.7948313502
Using apply_async took 0.000414133071899 seconds!
41.7597176061
48.0603437545
38.7413413879
25.6022231983
46.4843604574
46.3495273394
50.9073202011
48.2603508573
50.0502618731
41.7948313502
虽然在这个例子中,“apply”和“apply_async”的结果顺序是相同的,但一般情况下似乎并非如此。所以,我想使用“apply_async”,因为它要快得多,但在这种情况下,我不知道如何将模拟结果与我用于各自的 parameters/initial 条件相匹配模拟.
因此我的问题是:
1) 为什么在这种情况下“应用”比简单的 for 循环慢得多?
2) 当我使用“apply_async”而不是“apply”时,并行化版本变得比 for-loop 快得多,但是我如何才能将模拟结果与我的参数相匹配在各自的模拟中使用?
3) 在这种情况下,“apply”和“apply_async”的结果具有相同的顺序。这是为什么?巧合?
我的代码可以在下面找到:
from pylab import *
import multiprocessing as mp
from scipy.integrate import odeint
import time
#my system of differential equations
def myODE (yn,tvec,allpara):
(x, y, z) = yn
a, b = allpara['para']
dx = -x + a*y + x*x*y
dy = b - a*y - x*x*y
dz = x*y
return (dx, dy, dz)
#for reproducibility
seed(0)
#time settings for integration
dt = 0.01
tmax = 50
tval = arange(0,tmax,dt)
numVar = 3 #number of variables (x, y, z)
numPar = 2 #number of parameters (a, b)
numComb = 10 #number of parameter combinations
INIT = zeros((numComb,numVar)) #initial conditions will be stored here
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here
RES = zeros(numComb) #z(tmax) will be stored here
tic = time.time()
for combi in range(numComb):
INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0
PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly
allpara = {'para': PARA[combi,:]}
results = transpose(odeint(myODE, INIT[combi,:], tval, args=(allpara,))) #integrate system
RES[combi] = results[numVar - 1][-1] #store z
#INIT[combi,:] = results[:,-1] #update initial conditions
#INIT[combi,-1] = 0 #set z to 0
toc = time.time()
print 'The for loop took ', toc-tic, 'seconds!'
print RES
#function for the multi-processing part
def runMyODE(yn,tvec,allpara):
return transpose(odeint(myODE, yn, tvec, args=(allpara,)))
tic = time.time()
pool = mp.Pool(processes=4)
results = [pool.apply(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]
toc = time.time()
print 'Using apply took ', toc-tic, 'seconds!'
for sol in range(numComb):
print results[sol][2,-1] #print final value of z
tic = time.time()
resultsAsync = [pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]
toc = time.time()
print 'Using apply_async took ', toc-tic, 'seconds!'
for sol in range(numComb):
print resultsAsync[sol].get()[2,-1] #print final value of z
请注意,您的 apply_async 比 for 循环快 289 倍这一事实有点可疑!现在,您可以保证按照提交的顺序获得结果,即使这不是您想要的最大并行度。
apply_async 开始一个任务,它不会等到它完成; .get() 就是这样做的。所以这个:
tic = time.time()
resultsAsync = [pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]
toc = time.time()
这不是一个非常公平的衡量标准;你已经开始了所有的任务,但它们不一定已经完成了。
另一方面,一旦你 .get() 结果,你就知道任务已经完成并且你有了答案;所以这样做
for sol in range(numComb):
print resultsAsync[sol].get()[2,-1] #print final value of z
意味着您肯定会按顺序获得结果(因为您正在按顺序处理 ApplyResult 对象并 .get()ing 它们);但是您可能希望在结果准备就绪后立即获得结果,而不是一次一个地阻塞等待步骤。但这意味着您需要以一种或另一种方式用它们的参数标记结果。
您可以在任务完成后使用回调来保存结果,return 参数与结果一起保存,以允许完全异步 returns:
def runMyODE(yn,tvec,allpara):
return allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,)))
asyncResults = []
def saveResult(result):
asyncResults.append((result[0], result[1][2,-1]))
tic = time.time()
for combi in range(numComb):
pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]}), callback=saveResult)
pool.close()
pool.join()
toc = time.time()
print 'Using apply_async took ', toc-tic, 'seconds!'
for res in asyncResults:
print res[0], res[1]
给你更合理的时间;结果仍然几乎总是有序的,因为任务花费的时间非常相似:
Using apply took 0.0847041606903 seconds!
[ 6.02763376 5.44883183] 41.7597176061
[ 4.37587211 8.91773001] 48.0603437545
[ 7.91725038 5.2889492 ] 38.7413413879
[ 0.71036058 0.871293 ] 25.6022231983
[ 7.78156751 8.70012148] 46.4843604574
[ 4.61479362 7.80529176] 46.3495273394
[ 1.43353287 9.44668917] 50.9073202011
[ 2.64555612 7.74233689] 48.2603508573
[ 0.187898 6.17635497] 50.0502618731
[ 9.43748079 6.81820299] 41.7948313502
Using apply_async took 0.0259671211243 seconds!
[ 4.37587211 8.91773001] 48.0603437545
[ 0.71036058 0.871293 ] 25.6022231983
[ 6.02763376 5.44883183] 41.7597176061
[ 7.91725038 5.2889492 ] 38.7413413879
[ 7.78156751 8.70012148] 46.4843604574
[ 4.61479362 7.80529176] 46.3495273394
[ 1.43353287 9.44668917] 50.9073202011
[ 2.64555612 7.74233689] 48.2603508573
[ 0.187898 6.17635497] 50.0502618731
[ 9.43748079 6.81820299] 41.7948313502
请注意,除了循环应用之外,您还可以使用 map:
pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, para=PARA[combi,:]), range(numComb), callback=saveResult)