Python Joblib Parallel:如何合并每个工人的结果?

Python Joblib Parallel: How to combine results per worker?

上下文

我有一个生成大型二维 numpy 数组(具有固定形状)作为输出的函数。我在 8 个 CPU 上使用 joblibParallelmultiprocessing 后端)调用此函数 1000 次。在作业结束时,我将所有数组按元素相加(使用 np.sum)以生成我感兴趣的单个二维数组。但是,当我尝试这样做时,我 运行 出内存。我假设这是因为 1000 个数组需要存储在 RAM 中,直到最后对它们求和。

问题

有没有办法让每个 worker 在运行时将其数组相加?例如,worker 1 会将数组 2 添加到数组 1,然后在计算数组 3 之前丢弃数组 2,依此类推。这样,在任何时间点,RAM 中最多只能存储 8 个数组(对于 8 个 CPU),最后可以将这些数组相加得到相同的答案。

您事先知道您的论点并且计算时间与实际论点变化不大的事实简化了任务。它允许在开始时为每个工作进程分配完整的作业,并在最后总结结果,就像你建议的那样。

在下面的代码中,每个派生进程都获得所有参数的 "equal"(尽可能多)部分(它的 args_batch),并总结调用目标函数的中间结果自己的结果数组。这些数组最终由父进程汇总。

这里例子中的"delayed"函数不是计算数组的目标函数,而是目标函数(calc_array)得到的处理函数(worker)作为 job 的一部分与一批参数一起传递。

import numpy as np
from itertools import repeat
from time import sleep
from joblib import Parallel, delayed


def calc_array(v):
    """Create an array with specified shape and
    fill it up with value v, then kill some time.

    Dummy target function.
    """
    new_array = np.full(shape=SHAPE, fill_value=v)
    # delay result:
    cnt = 10_000_000
    for _ in range(cnt):
        cnt -= 1

    return new_array


def worker(func, args_batch):
    """Call func with every packet of arguments received and update
    result array on the run.

    Worker function which runs the job in each spawned process.
    """
    results = np.zeros(SHAPE)
    for args_ in args_batch:
        new_array = func(*args_)
        np.sum([results, new_array], axis=0, out=results)

    return results


def main(func, arguments, n_jobs, verbose):

    with Parallel(n_jobs=n_jobs, verbose=verbose) as parallel:

        # bundle up jobs:
        funcs = repeat(func, n_jobs)  # functools.partial seems not pickle-able
        args_batches = np.array_split(arguments, n_jobs, axis=0)
        jobs = zip(funcs, args_batches)

        result = sum(parallel(delayed(worker)(*job) for job in jobs))
        assert np.all(result == sum(range(CALLS_TOTAL)))

    sleep(1)  # just to keep stdout ordered
    print(result)


if __name__ == '__main__':

    SHAPE = (4, 4)  # shape of array calculated by calc_array
    N_JOBS = 8
    CALLS_TOTAL = 100
    VERBOSE = 10
    ARGUMENTS = np.asarray([*zip(range(CALLS_TOTAL))])
    # array([[0], [1], [2], ...]])
    # zip to bundle arguments in a container so we have less code to
    # adapt when feeding a function with multiple parameters

    main(func=calc_array, arguments=ARGUMENTS, n_jobs=N_JOBS, verbose=VERBOSE)