Python itertools with multiprocessing - 庞大的列表与迭代器的低效 CPU 使用

Python itertools with multiprocessing - huge list vs inefficient CPUs usage with iterator

我处理 n 个元素(在下面命名为 "pair"),并将重复用作我函数的参数。显然,只要 "r" 列表没有大到足以消耗所有内存,一切都可以正常工作。问题是我最终必须为 6 个元素重复 16 次以上。为此,我在云中使用 40 核系统。

代码如下所示:

if __name__ == '__main__':
  pool = Pool(39)
  r = itertools.product(pairs,repeat=16)
  pool.map(f, r)

我相信我应该使用迭代器而不是预先创建巨大的列表,问题就在这里开始了..

我尝试使用以下代码解决问题:

if __name__ == '__main__':
  pool = Pool(39)
  for r in itertools.product(pairs,repeat=14):
    pool.map(f, r)

内存问题消失了,但 CPU 使用率大约为每个内核 5%。现在代码的单核版本比这个更快。

如果你能指导我一点,我将不胜感激..

谢谢。

第二个代码示例较慢,因为您要将一对提交到 39 件作品。只有一名工作人员会处理您的请求,而其他 38 名工作人员将什么都不做!会更慢,因为您将在从主线程到工作进程的管道数据中产生开销。

您可以 "buffer" 一些对,然后执行这组对来平衡内存使用,但仍然可以利用多进程环境。

import itertools
from multiprocessing import Pool

def foo(x):
    return sum(x)

cpus = 3
pool = Pool(cpus)
# 10 is buffer size multiplier - the number of pair that each process will get
buff_size = 10*cpus  
buff = []
for i, r in enumerate(itertools.product(range(20), range(10))):
    if (i % buff_size) == (buff_size-1):
        print pool.map(foo, buff)
        buff = []
    else:
        buff.append(r)

if len(buff) > 0:
    print pool.map(foo, buff)
    buff = []

上面的输出将如下所示

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 5, 6, 7, 8, 9, 10, 11, 12, 13]
[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 8, 9, 10, 11, 12, 13, 14, 15, 16]
[9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 14, 15, 16, 17, 18, 19, 20, 21, 22]
[15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 17, 18, 19, 20, 21, 22, 23, 24, 25]
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]

使用 buff_size 乘数来获得适合您系统的平衡!

您的原始代码并未在您自己的代码中预先创建 listitertools.product returns 生成器),但 pool.map 正在实现整个生成器(因为它假定如果您可以存储所有输出,那么您也可以存储所有输入)。

这里不要使用pool.map。如果您需要有序的结果,请使用 pool.imap,或者如果结果顺序不重要,请使用 pool.imap_unordered。迭代任一调用的结果(不要包装在 list 中),并在结果出现时对其进行处理,内存应该不是问题:

if __name__ == '__main__':
    pool = Pool(39)
    for result in pool.imap(f, itertools.product(pairs, repeat=16)):
        print(result)

如果您使用 pool.map 作为副作用,那么您只需要 运行 它完成即可,但结果和顺序无关紧要,您可以通过使用imap_unordered 并使用 collections.deque 有效地耗尽 "results" 而无需实际存储任何东西(dequemaxlen0 是最快、最低的内存强制迭代器 运行 完成而不存储结果的方法:

from collections import deque

if __name__ == '__main__':
    pool = Pool(39)
    deque(pool.imap_unordered(f, itertools.product(pairs, repeat=16)), 0)

最后,我有点怀疑指定 39 Pool 个工人; multiprocessing 在很大程度上有利于 CPU 绑定任务;如果您使用的 worker 数量超过 CPU 个内核并获得收益,则 multiprocessing 可能会在 IPC 中花费您比获得的更多,并且使用更多的 worker 只是通过缓冲来掩盖问题更多数据。

如果您的工作主要 I/O 绑定,您可以尝试使用基于线程的池,这将避免 pickling 和 unpickling 的开销,以及父进程和子进程之间的 IPC 成本。与基于进程的池不同,Python 线程受制于 GIL 问题,因此您的 CPU 绑定在 Python 中工作(不包括 GIL 释放对 I/O、ctypes 调用 .dll/.so 文件,某些第三方扩展如 numpy 释放 GIL 用于繁重的 CPU 工作)仅限于单个内核(并且在 Python 2.x 对于 CPU 绑定工作,您通常会浪费大量解决 GIL 争用和执行上下文切换的工作;Python 3 消除了大部分浪费)。但是如果你的工作主要是 I/O 绑定,阻塞 I/O 释放 GIL 以允许其他线程 运行,所以你可以有很多线程,只要它们中的大多数延迟 [=] 51=]。切换也很容易(只要您没有将程序设计为依赖于每个工作人员的单独地址空间,假设您可以写入 "shared" 状态并且不影响其他工作人员或父进程),只需更改:

from multiprocessing import Pool

至:

from multiprocessing.dummy import Pool

你会得到 multiprocessing.dummy 版本的池,基于线程而不是进程。