Python 中的多处理:如何使用回调函数在 "apply_async" 上实现循环 "map_async"

Multiprocessing in Python: how to implement a loop over "apply_async" as "map_async" using a callback function

我想使用 Python 的多处理模块为多个参数组合集成一个微分方程组。因此,系统应该得到集成,并且应该存储参数组合及其索引和其中一个变量的最终值。

虽然当我使用 apply_async 时效果很好 - 这已经比在简单的 for 循环中执行它更快 - 我无法使用 map_async 实现同样的事情,这似乎更快比 apply_async。永远不会调用回调函数,我不知道为什么。谁能解释为什么会发生这种情况以及如何使用 map_async 而不是 apply_async 获得相同的输出?!

这是我的代码:

from pylab import *
import multiprocessing as mp
from scipy.integrate import odeint
import time

#my system of differential equations
def myODE (yn,tvec,allpara):

    (x, y, z) = yn

    a, b = allpara['para']

    dx  = -x + a*y + x*x*y
    dy = b - a*y - x*x*y
    dz = x*y

    return (dx, dy, dz) 

#returns the index of the parameter combination, the parameters and the integrated solution
#this way I know which parameter combination belongs to which outcome in the asynch-case
def runMyODE(yn,tvec,allpara):
    return allpara['index'],allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,)))

#for reproducibility    
seed(0) 

#time settings for integration
dt = 0.01
tmax = 50
tval = arange(0,tmax,dt)

numVar = 3 #number of variables (x, y, z)
numPar = 2 #number of parameters (a, b)
numComb = 5 #number of parameter combinations

INIT = zeros((numComb,numVar)) #initial conditions will be stored here
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here

#create some initial conditions and random parameters
for combi in range(numComb):

    INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0

    PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly

#################################using loop over apply####################

#results will be stored in here
asyncResultsApply = []

#my callback function
def saveResultApply(result):
    # storing the index, a, b and the final value of z
    asyncResultsApply.append((result[0], result[1], result[2][2,-1]))

#start the multiprocessing part
pool = mp.Pool(processes=4)
for combi in range(numComb):
    pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:], 'index': combi}), callback=saveResultApply)
pool.close()
pool.join()

for res in asyncResultsApply:
    print res[0], res[1], res[2] #printing the index, a, b and the final value of z

#######################################using map#####################
#the only difference is that the for loop is replaced by a "map_async" call
print "\n\nnow using map\n\n"
asyncResultsMap = []

#my callback function which is never called
def saveResultMap(result):
    # storing the index, a, b and the final value of z
    asyncResultsMap.append((result[0], result[1], result[2][2,-1]))

pool = mp.Pool(processes=4)
pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, {'para': PARA[combi,:], 'index': combi}), range(numComb), callback=saveResultMap)
pool.close()
pool.join()

#this does not work yet
for res in asyncResultsMap:
    print res[0], res[1], res[2] #printing the index, a, b and the final value of z

如果我没理解错的话,它源于一些经常让人们感到困惑的事情。 apply_async 的回调在单个操作之后调用,但 map 也是如此 - 它不会对每个元素调用回调,而是对整个结果调用一次。

您注意到 mapapply_async 快是​​正确的。如果您希望在每个结果之后发生一些事情,有几种方法:

  1. 您可以有效地将回调添加到您希望对每个元素执行的操作,并map使用它。

  2. 您可以在循环中使用imap(或imap_unordered),并在循环体内执行回调。当然,这意味着所有操作都将在父进程中执行,但是作为回调编写的东西的性质意味着这通常不是问题(它往往是廉价函数)。 YMMV.


例如,假设您有函数 fcb,并且您想要 map f on es with cb 每个操作。那么你可以这样做:

def look_ma_no_cb(e):
    r = f(e)
    cb(r)
    return r

p = multiprocessing.Pool()
p.map(look_ma_no_cb, es)

for r in p.imap(f, es):
    cb(r)