用于在 Python 中更新共享字典的多处理模块

Multiprocessing module for updating a shared dictionary in Python

我正在创建一个字典如下:

y=[(1,2),(2,3),(1,2),(5,6)]

dict={}

for tup in y:
    tup=tuple(sorted(tup))
    if tup in dict.keys():
        dict[tup]=dict[tup]+1
    else:
        dict[tup]=1

然而我的实际 y 包含大约 4000 万个元组,有没有办法使用多处理来加速这个过程?

谢谢

首先,与其在每次迭代中检查 dict.keys 中的 tup 的成员资格,这是一个非常糟糕的主意,您可以使用 collections.defaultdict() 来达到这个目的,这更巨蟒:

from collections import defaultdict
test_dict = defaultdict(lambda:1)

for tup in y:
    tup=tuple(sorted(tup))
    test_dict[tup]=+1

其次,如果你想使用并发,你可能想使用多线程或多处理,但是关于多线程,由于GIL多个线程不能一次执行一个字节码,你不能遍历你的来自双方的元组就像 BDS 算法。

但是对于多处理,您将遇到另一个问题,即从每个内核访问一个共享内存并同时处理它以获取更多信息,请阅读此答案

那么现在有什么窍门呢?

一种方法是将您的列表分成小块,然后使用多线程将您的函数应用于指定部分。

另一种方法是使用协程和子例程,正如那个答案中提到的那样,Greg Ewing great demonstration 介绍了如何使用 yield from 来使用协程来构建诸如调度程序或多角色模拟之类的东西。

编辑: 答案编辑为线程安全

multiprocessing 模块让一切变得简单。

只需重构您的代码即可在函数中完成处理:

def process_tuple(tuples):
    count_dict = {}
    for tuple_ in tuples:
        tuple_=tuple(sorted(tuple_))
        if tuple_ in count_dict:
            count_dict[tuple_] += 1
        else:
            count_dict[tuple_] = 1
    return count_dict

将元组列表分成小组,然后使用 map 处理所有小组。

## 
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

# cut tuples list into 5 chunks
tuples_groups = chunks(tuples, 5)
pool = Pool(5)
count_dict = {}
# processes chunks in parallel
results = pool.map(process_tuple, tuples_groups)
# collect results
for result in results:
    count_dict.update(result)

multiprocessing.Pool 将处理线程之间的分配。

完整示例 + 基准测试:

import time
import random

start_time = time.time()
tuples = []
x,y = (100000, 10)
for i in range(x):
    tuple_ = []
    for j in range(y):
        tuple_.append(random.randint(0, 9))
    tuples.append(tuple(tuple_))

print("--- %s data generated in %s seconds ---" % (x*y, time.time() - start_time))



def process_tuple(tuples):
    count_dict = {}
    for tuple_ in tuples:
        tuple_=tuple(sorted(tuple_))
        if tuple_ in count_dict:
            count_dict[tuple_] += 1
        else:
            count_dict[tuple_] = 1
    return count_dict

from multiprocessing import Pool

start_time = time.time()

## 
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

# cut tuples list into 5 chunks
tuples_groups = chunks(tuples, 5)
pool = Pool(5)
count_dict = {}
# processes chunks in parallel
results = pool.map(process_tuple, tuples_groups)
# collect results
for result in results:
    count_dict.update(result)

print("--- Multithread processed in %s seconds ---" % (time.time() - start_time))    



start_time = time.time()
count_dict = {}
for tuple_ in tuples:
    tuple_=tuple(sorted(tuple_))
    if tuple_ in count_dict:
        count_dict[tuple_] += 1
    else:
        count_dict[tuple_] = 1

print("--- Single thread processed in %s seconds ---" % (time.time() - start_time))

--- 10000000 data generated in 32.7803010941 seconds ---
--- Multithread processed in 1.79116892815 seconds ---
--- Single thread processed in 2.65010404587 seconds ---

因为你想增加计数(而不是简单地创建新的 key/value 对),字典不是线程安全的,除非你在每次更新周围获取一个信号量并在之后释放它 - 所以我不认为您会获得任何整体速度增益,实际上它可能会更慢。

如果您要对其进行线程处理,最好让每个线程更新自己的字典,然后在每个线程完成时合并结果,这样线程安全性就毫无疑问了。然而,因为它很可能是 CPU-bound 你应该使用多处理而不是线程 - 多处理可以利用你所有的 CPU 核心。

此外,如果您使用 collections.Counter,它会为您计数,并支持合并,并且有有用的方法 most_common(n) 到 return 键n 个最高计数。

您可以遵循 MapReduce 方法。

from collections import Counter
from multiprocessing import Pool

NUM_PROCESSES = 8

y = [(1,2),(2,3),(1,2),(5,6)] * 10

## 
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

## map
partial_counters = Pool(NUM_PROCESSES).map(Counter, chunks(y, NUM_PROCESSES))

## reduce
reduced_counter = reduce(Counter.__add__, partial_counters)

## Result is:
## Counter({(1, 2): 20, (5, 6): 10, (2, 3): 10})

想法是:

  1. 将您的输入列表分成块
  2. 将每个块提供给一个单独的进程,该进程将独立计算计数
  3. 通过归约操作将所有部分计数合并在一起。

编辑:使用 chunks(map(frozenset, y), NUM_PROCESSES) 来计算无序对。

如果您想要忽略顺序的计数,请使用 frozenset with Counter:

from collections import Counter

print(Counter(map(frozenset, y)))

使用另一个答案中的 tuples

In [9]: len(tuples)
Out[9]: 500000

In [10]: timeit Counter(map(frozenset, tuples))
1 loops, best of 3: 582 ms per loop

使用 frozenset 将意味着 (1, 2)(2,1) 将被视为相同:

In [12]: y = [(1, 2), (2, 3), (1, 2), (5, 6),(2, 1),(6,5)]

In [13]: from collections import Counter

In [14]: 

In [14]: print(Counter(map(frozenset, y)))
Counter({frozenset({1, 2}): 3, frozenset({5, 6}): 2, frozenset({2, 3}): 1})

如果您使用多处理应用相同的逻辑,它显然会快得多,即使没有它也比使用多处理提供的更快。