共享字典似乎在 Python 多重处理中不起作用

Shared dictionary seems not working in Python multipleprocessing

我试图通过使用 multipleprocessing 和共享 dict 来计算词频。我为一些初始测试写了一个简单的 Python 代码片段:

from multiprocessing import Manager, Pool


def foo(num):
    try:
        d[num] += 1
    except KeyError:
        d[num] = 1


d = Manager().dict()
pool = Pool(processes=2, maxtasksperchild=100)
tasks = [1] * 1001 + [2] * 2000 + [3] * 1300
pool.map_async(foo, tasks)
pool.close()
pool.join()

print len(tasks)
print d

但是,d 中的频率总数与 tasks 中的频率总数不匹配。在我看来 d 没有很好地同步,但我不知道为什么会发生这种情况以及如何解决它。有人可以帮我吗?

这里出现了竞争条件:

try:
    d[num] += 1
except KeyError:
    d[num] = 1

假设任务 1 尝试执行 d[1] += 1,但 d[1] 是空的,因此它得到 KeyError。现在任务 2,在另一个核心上,尝试做 d[1] += 1,但是 d[1] 仍然是空的,所以它也得到一个 KeyError。所以,现在任务 1 和任务 2 都将尝试设置 d[1] = 1,并且它们都会成功,因此 d[1] 现在是 1,并且您失去了 1 个增量。

更糟糕的是,假设在任务 1 开始设置 d[1] = 1 之前,任务 3-10 都在另一个核心上 运行 并一直递增 d[1]9。然后任务 1 进入并将其设置回 1,您已经失去了 9 个增量。


您可能认为只需预初始化 d = {1: 0, 2: 0, 3: 0} 并省略 try/except 即可解决此问题。但这仍然行不通。因为即使 d[1] += 1 也不是原子的。 Python 将其有效地编译成三个独立的操作:tmp = d.__getitem__(1)tmp += 1d.__setitem__(1, tmp).

因此,任务 1 可以从共享字典中获取现有的 0,并将其递增为 1,同时任务 2 已获取现有的 0,并将其递增为 1,现在他们都去存储 1 并且都成功了。而且,您可以再次看到这如何扩展到丢失大量增量而不是一个增量。


对共享数据的任何非原子操作都必须显式同步。这在文档的 Synchronization between processes and Sharing state between processes 中有解释。


这里最简单的修复(虽然显然不是最好的,因为它最终会序列化您的所有访问)是:

from multiprocessing import Manager, Pool, Lock

lock = Lock()

def foo(num):
    with lock:
        try:
            d[num] += 1
        except KeyError:
            d[num] = 1

如果你想变得更有趣并使它更有效率,你将不得不学习共享内存线程和同步;在一个 Whosebug 答案中解释太多了。