python 过滤器+多处理+迭代器延迟加载

python filter + multiprocessing + iterator lazy loading

我有一个二维数组,它生成一个巨大的 (>300GB) 组合列表,所以我想对 itertools.combinations 生成的迭代器进行惰性迭代并并行化此操作。问题是我需要过滤输出,而这不受 Multiprocessing 支持。我现有的解决方法需要将组合列表加载到内存中,但由于列表的大小,这也不起作用。


n_nodes = np.random.randn(10, 100)
cutoff=0.3

def node_combinations(nodes):
    return itertools.combinations(list(range(len(nodes))), 2)    

def pfilter(func, candidates):
    return np.asarray([c for c, keep in zip(candidates, pool.map(func, candidates)) if keep])

def pearsonr(xy: tuple):
    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
    if correlation_coefficient >= cutoff:
            return True
        else:
            return False


edgelist = pfilter(pearsonr, node_combinations(n_nodes))

我正在寻找一种方法,使用带过滤器而不是映射的多处理对大型迭代器进行惰性计算。

以下使用信号量来减慢过度急切的池线程。不是正确的解决方案,因为它没有解决其他问题,例如使用相同池并循环 imap 结果的嵌套循环在任何内部循环作业开始之前就完成了它们的外部循环作业。但它确实限制了内存使用:

def slowdown(n=16):
    s = threading.Semaphore(n)
    def inner(it):
        for item in it:
            s.acquire()
            yield item
    def outer(it):
        for item in it:
            s.release()
            yield item
    return outer, inner

这用于将 pool.imap 包装成这样:

outer, inner = slowdown()
outer(pool.imap(func, inner(candidates)))

Hoxha 的建议很有效 -- 谢谢!

@Dan 问题是即使是空列表也会占用内存,x420 亿对的内存将近 3TB。

这是我的实现:

import more_itertools
import itertools
import multiprocessing as mp
import numpy as np
import scipy
from tqdm import tqdm

n_nodes = np.random.randn(10, 100)
num_combinations = int((int(n_nodes.shape[0]) ** 2) - int(n_nodes.shape[0]) // 2)
cpu_count = 8
cutoff=0.3

def node_combinations(nodes):
    return itertools.combinations(list(range(len(nodes))), 2)    

def edge_gen(xy_iterator: type(itertools.islice)):
    edges = []
    for cand in tqdm(xy_iterator, total=num_combinations//cpu_count)
        if pearsonr(cand):
            edges.append(cand)

def pearsonr(xy: tuple):
    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
    if correlation_coefficient >= cutoff:
            return True
        else:
            return False


slices = more_itertools.distribute(cpu_count), node_combinations(n_nodes))
pool = mp.Pool(cpu_count)
results = pool.imap(edge_gen, slices)
pool.close()
pool.join()