如何将 joblib 并行化与不 return 任何东西的 in-class 方法一起使用

How to use joblib parallelization with in-class methods that don't return anything

我目前正尝试在 python 3.8.3.

中使用 joblib 实现 parallel for 循环

for 循环中,我想将 class 方法应用于一个 class 的实例,同时在另一个中应用方法。 这是我尝试查看我的想法是否可行的 MWE,但事实并非如此。有谁知道如何让它工作吗?

from joblib import Parallel, delayed

class A():
    def __init__(self):
        self.val = 0
    def add5(self):
        self.val += 5

class B():
    def __init__(self):
        self.obj = [A() for _ in range(10)]
    def apply(self):
        """ this is where I'm trying to use joblib:
        for a in self.obj:
            a.add5()"""

        def f(x):
            x.add5()
        Parallel(n_jobs=-1)(delayed(f)(x) for x in self.obj)
    def prnt(self):
        print([a.val for a in self.obj])

b = B()
b.prnt()  # returns [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
b.apply()
b.prnt()  # returns [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] but
          # I expect [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]


关于我的问题的更多背景信息:我正在使用 sci-kit learn 来实现增强算法,在应用算法之前生成弱学习器。拟合和预测是在 for 循环中完成的,可能需要一些时间,所以我想添加并行化来尝试加快进度。基本上,class A 是一个 classifier,class B 是我的算法,我想在其中安装我生成的所有 classifier。

来自 (source) 可以读到:

The default backend of joblib will run each function call in isolated Python processes, therefore they cannot mutate a common Python object defined in the main program.

However if the parallel function really needs to rely on the shared memory semantics of threads, it should be made explicit with require='sharedmem', for instance:

所以您有两个选择:1) 您将 require='sharedmem' 添加到您的 Parallel 对于:

Parallel(n_jobs=-1, require='sharedmem')(delayed(f)(x) for x in self.obj)

然而,source指出:

Keep in mind that relying a on the shared-memory semantics is probably suboptimal from a performance point of view as concurrent access to a shared Python object will suffer from lock contention.

在 2) 选项中,您必须更改代码中的两处内容。

先把f函数改成:

 def f(x):
      x.add5()

向return对象返回。

def f(x):
    x.add5()
    return x

并在 Parallel loop 中更改为:

  Parallel(n_jobs=-1)(delayed(f)(x) for x in self.obj)

进入:

 self.obj = Parallel(n_jobs=-1)(delayed(f)(x) for x in self.obj)

以便您可以通过并行循环将 self.obj 分配给列表 return。

最终代码:

from joblib import Parallel, delayed


class A:
    def __init__(self):
        self.val = 0

    def add5(self):
        self.val += 5


class B:
    def __init__(self):
        self.obj = [A() for _ in range(10)]

    def apply(self):
        """ this is where I'm trying to use joblib:
        for a in self.obj:
            a.add5()"""

        def f(x):
            x.add5()
            return x

        self.obj = Parallel(n_jobs=-1)(delayed(f)(x) for x in self.obj)

    def prnt(self):
        print([a.val for a in self.obj])


b = B()
b.prnt()  # returns [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
b.apply()
b.prnt()  # returns [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] but
          # I expect [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]