为什么 pool.map() 和 map() 返回不同的结果?

Why are pool.map() and map() returning varying results?

我有以下程序:

import string
import itertools
import multiprocessing as mp

def test(word_list):
    return list(map(lambda xy: (xy[0], len(list(xy[1]))),
        itertools.groupby(sorted(word_list))))

def f(x):
    return (x[0], len(list(x[1])))

def test_parallel(word_list):
    w = mp.cpu_count()
    pool = mp.Pool(w)
    return (pool.map(f, itertools.groupby(sorted(word_list))))

def main():
    test_list = ["test", "test", "test", "this", "this", "that"]

    print(test(test_list))
    print(test_parallel(test_list))

    return

if __name__ == "__main__":
    main()

输出是:

[('test', 3), ('that', 1), ('this', 2)]
[('test', 0), ('that', 0), ('this', 1)]

第一行是预期的正确结果。我的问题是,为什么 pool.map() 返回的结果与 map() 不同?

此外,我知道 6 项列表不适合多处理。这只是我在大型应用程序中实施时遇到的问题的演示。

我正在使用 Python 3.5.1.

每组

groupby()returns迭代器,并且这些不独立来自传入的底层迭代器. 你不能独立地并行迭代这些组;在您访问下一个群组时,任何前面的群组都会提前结束。

pool.map() 将尝试读取所有 groupby() 迭代器结果以将这些结果发送到单独的函数;仅仅试图获得第二组将导致第一组为空。

您可以在没有 pool.map() 的情况下看到相同的结果,只需迭代到 groupby():

的下一个结果
>>> from itertools import groupby
>>> word_list = ["test", "test", "test", "this", "this", "that"]
>>> iterator = groupby(sorted(word_list))
>>> first = next(iterator)
>>> next(first[1])
'test'
>>> second = next(iterator)
>>> list(first[1])
[]

第一组的剩余部分是 'empty',因为第二组已被请求。

这显然是documented:

Because the source is shared, when the groupby() object is advanced, the previous group is no longer visible.

您必须 'materialise' 每个组 将其发送到函数之前

return pool.map(lambda kg: f((k[0], list(kg[1]))), itertools.groupby(sorted(word_list)))

return pool.map(f, (
    (key, list(group)) for key, group in itertools.groupby(sorted(word_list))))

其中生成器表达式负责在 pool.map() 迭代时具体化。

来自https://docs.python.org/3.5/library/itertools.html#itertools.groupby

The returned group is itself an iterator that shares the underlying iterable with groupby(). Because the source is shared, when the groupby() object is advanced, the previous group is no longer visible. So, if that data is needed later, it should be stored as a list:

groups = []
uniquekeys = []
data = sorted(data, key=keyfunc)
for k, g in groupby(data, keyfunc):
    groups.append(list(g))      # Store group iterator as a list
    uniquekeys.append(k)

我认为这里的问题是 Pool.map 试图切碎它的输入,在这样做的过程中,它遍历了 groupby 的结果,这有效地跳过了除最后一组。

您的代码的一个修复方法是使用 [(k, list(v)) for k, v in itertools.groupby(sorted(word_list))] 之类的东西,但我不知道这对您的实际用例有多适用。