使用 python 的多处理模块分发作业

distribute jobs with python's multiprocessing module

问题描述

我有一些代码刚开始尝试在 Python 3.5 中加速。我正在尝试使用 multiprocessing 模块来完成此操作。这是一个演示我正在尝试做的事情的最小示例。

串行,代码更直接。 Momma_Serial class 中有一个 Baby 对象列表。有时,我们想对每个调用 Baby.evolve() 方法。实际上,会有很多这样的 Baby 对象(本例中只有 100 个)。这就是寻求并行的最初动机。

使整个事情变得复杂的是,程序的顶层通过传递一个函数 pass_this_func() 来说明如何在许多 Baby 对象中的每一个上完成此操作。此函数是 Momma_Serial.evolve_all_elems() 的参数,并传递给此妈妈对象内的所有小婴儿对象。

class Baby:
    def __init__(self, lol):
        self.lol = lol

    def evolve(self, f):
        self.lol = f(self.lol)

def pass_this_func(thing):
    return( 2 * thing )        

class Momma_Serial:
    def __init__(self, num):
        self.my_list = [Baby(i) for i in range(num)]

    def evolve_all_elems(self, the_func):
        for baby in self.my_list:
            baby.evolve(the_func)


momma1 = Momma_Serial(100)
[baby.lol for baby in momma1.my_list]
momma1.evolve_all_elems(pass_this_func)
[baby.lol for baby in momma1.my_list]

这可以正常工作。但它很慢。这是我使用多处理模块重写 Momma class 的尝试。

import multiprocessing as mp

class Momma_MP:
    def __init__(self, num):
        self.my_list = [Baby(i) for i in range(num)]

    def evolve_all_elems(self, the_func):

        num_workers = 2            

        def f(my_obj):
            my_obj.evolve(the_func)

        with mp.Pool(num_workers) as pool:        
            pool.map(f, self.my_list)

那我试试运行吧:

momma2 = Momma_MP(100)
[baby.lol for baby in momma2.my_list]
momma2.evolve_all_elems(pass_this_func) #error comes here
# [baby.lol for baby in momma2.my_list]

但是我得到错误:

AttributeError: Can't pickle local object 'Momma_MP.evolve_all_elems.<locals>.f'

this Whosebug question 的回答指出 "functions are only picklable if they are defined at the top-level of a module." 这个陈述使得它似乎是通过在 Momma_MP class 之外定义一个函数来实现这一点的唯一方法。但我真的不想那样做,因为这会给我的代码带来更多问题。

我的问题##

(稍作编辑) 有什么解决方法吗?假设我无法在 class 之外定义映射函数。还假设 Momma() 没有在 __main__ 顶级脚本环境中实例化。另外,我不想偏离这个程序设计太多,因为我希望所有这些 Baby() 实例都被抽象掉;我不希望实例化实例或与 Momma() 实例交互的 places/programs 不得不担心或知道与 Baby() class 相关的任何事情。这些额外的限制使问题与 here.

的情况略有不同

顺便说一句,以下不会引发错误,但可能会进行一些复制,因为构成的 Baby 对象没有任何反应。

def outside_f(obj):
    obj.evolve(pass_this_func)       

class Momma_MP:
    def __init__(self, num):
        self.my_list = [Baby(i) for i in range(num)]

    def evolve_all_elems(self, the_func):
        num_workers = 2            

        with mp.Pool(num_workers) as pool:        
            pool.map(outside_f, self.my_list)        

momma2 = Momma_MP(100)
[baby.lol for baby in momma2.my_list]
momma2.evolve_all_elems(pass_this_func)
[baby.lol for baby in momma2.my_list] # no change here?

我会尝试给出一个我能找到的其他地方没有涵盖的答案(见我上面的评论)。我假设你有不同类型的妈妈,它们具有不同的 f() 功能。

你可以做一个函数 evolver():

def evolver(baby):
   momma = baby.momma
   momma.evolve(baby)

您需要在 Baby__init__() 中分配 self.momma,将 Momma 实例传递给 Baby

class Baby:
    def __init__(self, lol, momma):
        self.lol = lol
        self.momma = momma

现在您将从 Momma 派生以覆盖 evolve() 方法以专门化 evolve() 函数。

所以现在当你调用 pool.map(evolver, babies) 时,它会将 baby 传递给 evolver(),然后 momma 会向 evolve() baby.

我上面链接的一个答案说你也可以做以下事情:

class Momma:
    evolver = staticmethod(evolver)

...将全局方法放入 class.