如何直接汇总 python 多处理获得的结果而不返回所有单个结果以节省内存?

How to directly sum up results obtained with python multiprocessing and not returning all individual results to save memory?

我有一个函数可以创建一个大掩码(布尔数组)。我想多次调用此函数并创建一个形状相同的总掩码,该掩码在任何单个掩码中为真的索引处为真。

由于掩码的计算需要很多时间,我已经将它并行化,但该函数现在消耗大量内存,因为我首先创建所有单独的掩码然后组合它们,这意味着我必须存储所有 ~40.000个人面具。在使用多处理计算下一个掩码之前,是否可以直接将返回的单个掩码添加到总掩码中?

这是问题的示例代码:

import numpy as np
from multiprocessing import Pool


def return_something(seed):
    np.random.seed(seed)
    return np.random.choice([True, False], size=shape, p=[0.1, 0.9])


shape = (50, 50)
ncores = 4
seeds = np.random.randint(low=0, high=np.iinfo(np.int32).max, size=10)

# Without parallelisation, very slow:
mask = np.zeros(shape, dtype=bool)
for seed in seeds:
    mask |= return_something(seed)


# With parallelisation, takes too much memory
p = Pool(ncores)
mask_parallel = np.any(list(p.imap(return_something, seeds)), axis=0)

我觉得我对 (i)map 函数的了解还不够。我知道 multiprocessing.imap returns 一个生成器,并且可以使用带有以下代码的 tqdm 显示例如进度条:

list(tqdm.tqdm(p.imap(fct, inputs), total=len(inputs))

由于进度条在多处理过程中更新 运行 我认为一定有可能在 运行 期间已经访问结果并可能对它们进行总结,但我不知道如何。

感谢您的帮助!

遍历种子是没有意义的,因为您每次都在 return_somethign 中创建一个非常大的数组。因此,您必须将此数组创建切片为一些子创建并遍历这些子创建。 Pool.map() 方法 returns 每次迭代中执行函数的结果列表。向您展示针对您的案例的一般实施。我正在做的只是并行化每一行的创建并通过 map() 函数将它们放在一起。

import numpy as np
import multiprocessing as mp

def return_something(i):
    mask = np.random.choice([True, False], size=(shape[0],), p=[0.1, 0.9])
    return mask

shape = (5000, 5000)

if __name__ == "__main__":
    pool = mp.Pool(mp.cpu_count())
    results = pool.map(return_something, [i for i in range(shape[1])])
    pool.close()
    print(len(results))

关于您的评论,我正在展示一种方法,可以在计算结果项后(即时)将它们附加到列表中

import numpy as np
from multiprocessing import Pool
import time

def return_something(seed):
    np.random.seed(seed)
    return np.random.choice([True, False], size=shape, p=[0.1, 0.9])


shape = (50, 50)
ncores = 4
seeds = np.random.randint(low=0, high=np.iinfo(np.int32).max, size=100000)

mask = []

if __name__ == "__main__":
    p = Pool(12)
    start = time.time()
    for res in p.imap(return_something, seeds, chunksize=1):
        mask.append(res)
        print("{} (Time elapsed: {}s)".format(len(res), time.time() - start))

    p.close()
    print(len(mask))