如何在 Python 中并行化此查询(就像 PLINQ)?

How to parallelize this query in Python (just like PLINQ)?

我在 Python 中编写的查询有一些问题(必须将其用于 TensorFlow),它工作正常但速度太慢,因为输入数据集非常大。查询可能需要 5 分钟以上才能完成,检查任务管理器我可以确认它确实 运行 在单核上。

代码如下:

# Assume words is a list of strings
for i, pair in enumerate(sorted(
    ((word, words.count(word))          # Map each word to itself and its count
        for word in set(words)),        # Set of unique words (remove duplicates)
    key=lambda p: p[1],                 # Order by the frequency of each word
    reverse=True)):                     # Descending order - less frequent words last

    # Do stuff with each sorted pair

我在这里所做的只是获取输入列表 words,去除重复项,然后根据单词在输入文本中的频率降序排列。

如果我使用 PLINQ 在 C# 中编写此代码,我会这样做:

var query = words.AsParallel().Distinct()
            .OrderByDescending(w => words.Count(s => s.Equals(w)))
            .Select((w, i) => (w, i));

我找不到一种简单的方法来使用可能的内置库重写 Python 中的并行实现。我看到了一些关于 Pool 扩展的指南,但看起来它只是相当于并行 Select 操作,所以我仍然想念如何实现 DistinctOrderByDescending 并行操作 Python。

是否可以使用内置库来做到这一点,或者是否有常用的第 3 方库来做到这一点?

谢谢!

您当前方法的问题主要基于 for 循环内的 words.count(word)。这意味着您为 set(words) 中的每个唯一单词迭代整个列表,并且只计算一个单词...相反,您可以使用 Counter 并单次遍历您的列表。 Counter 对象是一个字典,您可以将其用作排序中的键,查找频率为 O(1)。在我的示例中,即使是 1000 "words",speed-up 也是戏剧性的......对于更长的输入,我厌倦了等待 timeit 完成 :)

import string
from collections import Counter
import numpy as np # Just to create fake data

# Create some fake data
letters = list(string.ascii_lowercase)
new_words = [''.join(list(np.random.choice(letters, 3, replace=True))) 
             for x in range(1000)]


def original(search_list):
    """ Your current approach """
    for i, pair in enumerate(sorted(
    ((word, search_list.count(word)) 
        for word in set(search_list)),
    key=lambda p: p[1],
    reverse=True)):    
        pass


def new_approach(search_list):
    freq = Counter(search_list)
    search_list = sorted(search_list, key=lambda x: freq[x], reverse=True)
    new_list = []
    checked = set()
    for item in search_list:
        if item not in checked:
            new_list.append(item)
            checked.add(item)

对于 1000 个列表 "words":

%timeit original(new_words)
26.6 ms ± 289 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


%timeit new_approach(new_words)
833 µs ± 30 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

在尝试使用 multiprocessing 之类的方法之前,您应该看看这种新方法是否适合您的需求,因为这可能会增加额外的代码复杂性,一旦您解决了时间复杂性问题,这些代码就不再需要了。

编辑:

正如OP所指出的,我们可以跳过中间列表并通过简单地对Counter对象进行排序来设置:

def new_approach(search_list):
    freq = Counter(search_list)
    search_list = enumerate(sorted(freq, key=lambda x: freq[x], reverse=True))

新时间:

%timeit new_approach(new_words)
438 µs ± 6.31 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)