Python itertools with multiprocessing - 庞大的列表与迭代器的低效 CPU 使用
Python itertools with multiprocessing - huge list vs inefficient CPUs usage with iterator
我处理 n 个元素(在下面命名为 "pair"),并将重复用作我函数的参数。显然,只要 "r" 列表没有大到足以消耗所有内存,一切都可以正常工作。问题是我最终必须为 6 个元素重复 16 次以上。为此,我在云中使用 40 核系统。
代码如下所示:
if __name__ == '__main__':
pool = Pool(39)
r = itertools.product(pairs,repeat=16)
pool.map(f, r)
我相信我应该使用迭代器而不是预先创建巨大的列表,问题就在这里开始了..
我尝试使用以下代码解决问题:
if __name__ == '__main__':
pool = Pool(39)
for r in itertools.product(pairs,repeat=14):
pool.map(f, r)
内存问题消失了,但 CPU 使用率大约为每个内核 5%。现在代码的单核版本比这个更快。
如果你能指导我一点,我将不胜感激..
谢谢。
第二个代码示例较慢,因为您要将一对提交到 39 件作品。只有一名工作人员会处理您的请求,而其他 38 名工作人员将什么都不做!会更慢,因为您将在从主线程到工作进程的管道数据中产生开销。
您可以 "buffer" 一些对,然后执行这组对来平衡内存使用,但仍然可以利用多进程环境。
import itertools
from multiprocessing import Pool
def foo(x):
return sum(x)
cpus = 3
pool = Pool(cpus)
# 10 is buffer size multiplier - the number of pair that each process will get
buff_size = 10*cpus
buff = []
for i, r in enumerate(itertools.product(range(20), range(10))):
if (i % buff_size) == (buff_size-1):
print pool.map(foo, buff)
buff = []
else:
buff.append(r)
if len(buff) > 0:
print pool.map(foo, buff)
buff = []
上面的输出将如下所示
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 5, 6, 7, 8, 9, 10, 11, 12, 13]
[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 8, 9, 10, 11, 12, 13, 14, 15, 16]
[9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 14, 15, 16, 17, 18, 19, 20, 21, 22]
[15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 17, 18, 19, 20, 21, 22, 23, 24, 25]
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
使用 buff_size
乘数来获得适合您系统的平衡!
您的原始代码并未在您自己的代码中预先创建 list
(itertools.product
returns 生成器),但 pool.map
正在实现整个生成器(因为它假定如果您可以存储所有输出,那么您也可以存储所有输入)。
这里不要使用pool.map
。如果您需要有序的结果,请使用 pool.imap
,或者如果结果顺序不重要,请使用 pool.imap_unordered
。迭代任一调用的结果(不要包装在 list
中),并在结果出现时对其进行处理,内存应该不是问题:
if __name__ == '__main__':
pool = Pool(39)
for result in pool.imap(f, itertools.product(pairs, repeat=16)):
print(result)
如果您使用 pool.map
作为副作用,那么您只需要 运行 它完成即可,但结果和顺序无关紧要,您可以通过使用imap_unordered
并使用 collections.deque
有效地耗尽 "results" 而无需实际存储任何东西(deque
和 maxlen
的 0
是最快、最低的内存强制迭代器 运行 完成而不存储结果的方法:
from collections import deque
if __name__ == '__main__':
pool = Pool(39)
deque(pool.imap_unordered(f, itertools.product(pairs, repeat=16)), 0)
最后,我有点怀疑指定 39 Pool
个工人; multiprocessing
在很大程度上有利于 CPU 绑定任务;如果您使用的 worker 数量超过 CPU 个内核并获得收益,则 multiprocessing
可能会在 IPC 中花费您比获得的更多,并且使用更多的 worker 只是通过缓冲来掩盖问题更多数据。
如果您的工作主要 I/O 绑定,您可以尝试使用基于线程的池,这将避免 pickling 和 unpickling 的开销,以及父进程和子进程之间的 IPC 成本。与基于进程的池不同,Python 线程受制于 GIL 问题,因此您的 CPU 绑定在 Python 中工作(不包括 GIL 释放对 I/O、ctypes
调用 .dll/.so 文件,某些第三方扩展如 numpy
释放 GIL 用于繁重的 CPU 工作)仅限于单个内核(并且在 Python 2.x 对于 CPU 绑定工作,您通常会浪费大量解决 GIL 争用和执行上下文切换的工作;Python 3 消除了大部分浪费)。但是如果你的工作主要是 I/O 绑定,阻塞 I/O 释放 GIL 以允许其他线程 运行,所以你可以有很多线程,只要它们中的大多数延迟 [=] 51=]。切换也很容易(只要您没有将程序设计为依赖于每个工作人员的单独地址空间,假设您可以写入 "shared" 状态并且不影响其他工作人员或父进程),只需更改:
from multiprocessing import Pool
至:
from multiprocessing.dummy import Pool
你会得到 multiprocessing.dummy
版本的池,基于线程而不是进程。
我处理 n 个元素(在下面命名为 "pair"),并将重复用作我函数的参数。显然,只要 "r" 列表没有大到足以消耗所有内存,一切都可以正常工作。问题是我最终必须为 6 个元素重复 16 次以上。为此,我在云中使用 40 核系统。
代码如下所示:
if __name__ == '__main__':
pool = Pool(39)
r = itertools.product(pairs,repeat=16)
pool.map(f, r)
我相信我应该使用迭代器而不是预先创建巨大的列表,问题就在这里开始了..
我尝试使用以下代码解决问题:
if __name__ == '__main__':
pool = Pool(39)
for r in itertools.product(pairs,repeat=14):
pool.map(f, r)
内存问题消失了,但 CPU 使用率大约为每个内核 5%。现在代码的单核版本比这个更快。
如果你能指导我一点,我将不胜感激..
谢谢。
第二个代码示例较慢,因为您要将一对提交到 39 件作品。只有一名工作人员会处理您的请求,而其他 38 名工作人员将什么都不做!会更慢,因为您将在从主线程到工作进程的管道数据中产生开销。
您可以 "buffer" 一些对,然后执行这组对来平衡内存使用,但仍然可以利用多进程环境。
import itertools
from multiprocessing import Pool
def foo(x):
return sum(x)
cpus = 3
pool = Pool(cpus)
# 10 is buffer size multiplier - the number of pair that each process will get
buff_size = 10*cpus
buff = []
for i, r in enumerate(itertools.product(range(20), range(10))):
if (i % buff_size) == (buff_size-1):
print pool.map(foo, buff)
buff = []
else:
buff.append(r)
if len(buff) > 0:
print pool.map(foo, buff)
buff = []
上面的输出将如下所示
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 5, 6, 7, 8, 9, 10, 11, 12, 13]
[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 8, 9, 10, 11, 12, 13, 14, 15, 16]
[9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 14, 15, 16, 17, 18, 19, 20, 21, 22]
[15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 17, 18, 19, 20, 21, 22, 23, 24, 25]
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
使用 buff_size
乘数来获得适合您系统的平衡!
您的原始代码并未在您自己的代码中预先创建 list
(itertools.product
returns 生成器),但 pool.map
正在实现整个生成器(因为它假定如果您可以存储所有输出,那么您也可以存储所有输入)。
这里不要使用pool.map
。如果您需要有序的结果,请使用 pool.imap
,或者如果结果顺序不重要,请使用 pool.imap_unordered
。迭代任一调用的结果(不要包装在 list
中),并在结果出现时对其进行处理,内存应该不是问题:
if __name__ == '__main__':
pool = Pool(39)
for result in pool.imap(f, itertools.product(pairs, repeat=16)):
print(result)
如果您使用 pool.map
作为副作用,那么您只需要 运行 它完成即可,但结果和顺序无关紧要,您可以通过使用imap_unordered
并使用 collections.deque
有效地耗尽 "results" 而无需实际存储任何东西(deque
和 maxlen
的 0
是最快、最低的内存强制迭代器 运行 完成而不存储结果的方法:
from collections import deque
if __name__ == '__main__':
pool = Pool(39)
deque(pool.imap_unordered(f, itertools.product(pairs, repeat=16)), 0)
最后,我有点怀疑指定 39 Pool
个工人; multiprocessing
在很大程度上有利于 CPU 绑定任务;如果您使用的 worker 数量超过 CPU 个内核并获得收益,则 multiprocessing
可能会在 IPC 中花费您比获得的更多,并且使用更多的 worker 只是通过缓冲来掩盖问题更多数据。
如果您的工作主要 I/O 绑定,您可以尝试使用基于线程的池,这将避免 pickling 和 unpickling 的开销,以及父进程和子进程之间的 IPC 成本。与基于进程的池不同,Python 线程受制于 GIL 问题,因此您的 CPU 绑定在 Python 中工作(不包括 GIL 释放对 I/O、ctypes
调用 .dll/.so 文件,某些第三方扩展如 numpy
释放 GIL 用于繁重的 CPU 工作)仅限于单个内核(并且在 Python 2.x 对于 CPU 绑定工作,您通常会浪费大量解决 GIL 争用和执行上下文切换的工作;Python 3 消除了大部分浪费)。但是如果你的工作主要是 I/O 绑定,阻塞 I/O 释放 GIL 以允许其他线程 运行,所以你可以有很多线程,只要它们中的大多数延迟 [=] 51=]。切换也很容易(只要您没有将程序设计为依赖于每个工作人员的单独地址空间,假设您可以写入 "shared" 状态并且不影响其他工作人员或父进程),只需更改:
from multiprocessing import Pool
至:
from multiprocessing.dummy import Pool
你会得到 multiprocessing.dummy
版本的池,基于线程而不是进程。