共享字典似乎在 Python 多重处理中不起作用
Shared dictionary seems not working in Python multipleprocessing
我试图通过使用 multipleprocessing
和共享 dict
来计算词频。我为一些初始测试写了一个简单的 Python 代码片段:
from multiprocessing import Manager, Pool
def foo(num):
try:
d[num] += 1
except KeyError:
d[num] = 1
d = Manager().dict()
pool = Pool(processes=2, maxtasksperchild=100)
tasks = [1] * 1001 + [2] * 2000 + [3] * 1300
pool.map_async(foo, tasks)
pool.close()
pool.join()
print len(tasks)
print d
但是,d
中的频率总数与 tasks
中的频率总数不匹配。在我看来 d
没有很好地同步,但我不知道为什么会发生这种情况以及如何解决它。有人可以帮我吗?
这里出现了竞争条件:
try:
d[num] += 1
except KeyError:
d[num] = 1
假设任务 1 尝试执行 d[1] += 1
,但 d[1]
是空的,因此它得到 KeyError
。现在任务 2,在另一个核心上,尝试做 d[1] += 1
,但是 d[1]
仍然是空的,所以它也得到一个 KeyError
。所以,现在任务 1 和任务 2 都将尝试设置 d[1] = 1
,并且它们都会成功,因此 d[1]
现在是 1
,并且您失去了 1 个增量。
更糟糕的是,假设在任务 1 开始设置 d[1] = 1
之前,任务 3-10 都在另一个核心上 运行 并一直递增 d[1]
至 9
。然后任务 1 进入并将其设置回 1
,您已经失去了 9 个增量。
您可能认为只需预初始化 d = {1: 0, 2: 0, 3: 0}
并省略 try
/except
即可解决此问题。但这仍然行不通。因为即使 d[1] += 1
也不是原子的。 Python 将其有效地编译成三个独立的操作:tmp = d.__getitem__(1)
、tmp += 1
、d.__setitem__(1, tmp)
.
因此,任务 1 可以从共享字典中获取现有的 0,并将其递增为 1,同时任务 2 已获取现有的 0,并将其递增为 1,现在他们都去存储 1
并且都成功了。而且,您可以再次看到这如何扩展到丢失大量增量而不是一个增量。
对共享数据的任何非原子操作都必须显式同步。这在文档的 Synchronization between processes and Sharing state between processes 中有解释。
这里最简单的修复(虽然显然不是最好的,因为它最终会序列化您的所有访问)是:
from multiprocessing import Manager, Pool, Lock
lock = Lock()
def foo(num):
with lock:
try:
d[num] += 1
except KeyError:
d[num] = 1
如果你想变得更有趣并使它更有效率,你将不得不学习共享内存线程和同步;在一个 Whosebug 答案中解释太多了。
我试图通过使用 multipleprocessing
和共享 dict
来计算词频。我为一些初始测试写了一个简单的 Python 代码片段:
from multiprocessing import Manager, Pool
def foo(num):
try:
d[num] += 1
except KeyError:
d[num] = 1
d = Manager().dict()
pool = Pool(processes=2, maxtasksperchild=100)
tasks = [1] * 1001 + [2] * 2000 + [3] * 1300
pool.map_async(foo, tasks)
pool.close()
pool.join()
print len(tasks)
print d
但是,d
中的频率总数与 tasks
中的频率总数不匹配。在我看来 d
没有很好地同步,但我不知道为什么会发生这种情况以及如何解决它。有人可以帮我吗?
这里出现了竞争条件:
try:
d[num] += 1
except KeyError:
d[num] = 1
假设任务 1 尝试执行 d[1] += 1
,但 d[1]
是空的,因此它得到 KeyError
。现在任务 2,在另一个核心上,尝试做 d[1] += 1
,但是 d[1]
仍然是空的,所以它也得到一个 KeyError
。所以,现在任务 1 和任务 2 都将尝试设置 d[1] = 1
,并且它们都会成功,因此 d[1]
现在是 1
,并且您失去了 1 个增量。
更糟糕的是,假设在任务 1 开始设置 d[1] = 1
之前,任务 3-10 都在另一个核心上 运行 并一直递增 d[1]
至 9
。然后任务 1 进入并将其设置回 1
,您已经失去了 9 个增量。
您可能认为只需预初始化 d = {1: 0, 2: 0, 3: 0}
并省略 try
/except
即可解决此问题。但这仍然行不通。因为即使 d[1] += 1
也不是原子的。 Python 将其有效地编译成三个独立的操作:tmp = d.__getitem__(1)
、tmp += 1
、d.__setitem__(1, tmp)
.
因此,任务 1 可以从共享字典中获取现有的 0,并将其递增为 1,同时任务 2 已获取现有的 0,并将其递增为 1,现在他们都去存储 1
并且都成功了。而且,您可以再次看到这如何扩展到丢失大量增量而不是一个增量。
对共享数据的任何非原子操作都必须显式同步。这在文档的 Synchronization between processes and Sharing state between processes 中有解释。
这里最简单的修复(虽然显然不是最好的,因为它最终会序列化您的所有访问)是:
from multiprocessing import Manager, Pool, Lock
lock = Lock()
def foo(num):
with lock:
try:
d[num] += 1
except KeyError:
d[num] = 1
如果你想变得更有趣并使它更有效率,你将不得不学习共享内存线程和同步;在一个 Whosebug 答案中解释太多了。