在 pd.concat 中并行化 for 循环

Parallelize for loop in pd.concat

我需要根据不完全匹配的字符串列合并两个大型数据集。我有广泛的数据集,可以帮助我比单独的字符串距离更准确地确定最佳匹配,但我首先需要 return 几个 'top matches' 每个字符串。

可重现的例子:

def example_function(idx, string, comparisons, n):
    tup = process.extract(string, comparisons, limit = n)
    df2_index = [i[2] for i in tup]
    scores = [i[1] for i in tup]
    return pd.DataFrame({
        "df1_index": [idx] * n,
        "df2_index": df2_index,
        "score": scores
    })

import pandas as pd
from fuzzywuzzy import process

s1 = pd.Series(["two apples", "one orange", "my banana", "red grape", "huge kiwi"])
s2 = pd.Series(["a couple of apples", "old orange", "your bananas", "purple grape", "tropical fruit"])

pd.concat([example_function(index, value, s2, 2) for index, value in s1.items()]).reset_index()

我未能成功并行化此函数。似乎最接近我正在尝试做的是 multiprocessing implementation, but even with starmap 我没有得到结果。我想有一种简单的方法可以实现这一点,但还没有找到有效的方法。

我愿意接受有关如何优化我的代码的任何建议,但并行处理在这种情况下是一个合适的解决方案,因为它看起来需要大约 4-5 个小时(事后看来,这是一个慷慨的估计) 如果按顺序完成。

更新

感谢您提供解决方案。我有一个 7,000 行的 df1 和一个 70,000 行的 df2。对于下面的结果,我从 df2 中搜索了所有 70,000 行,以查找 df1 的前 20 行中的每一行。

优化代码如下:

from dask.distributed import Client
from dask import delayed
from rapidfuzz import process, fuzz
from itertools import chain

client = Client(n_workers = 4, processes = False)

def example_function(idx, string, comparisons, n):
    tup = process.extract(string, comparisons, scorer = fuzz.WRatio, limit = n)
    return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]

jobs = [delayed(example_function)(index, value, t3, 20) for index, value in t1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
print(df)

client.close()

并行处理并没有达到我预期的效果。也许它没有针对此功能进行理想设置,或者随着我包含更多迭代,它可能会继续扩展以产生更大的影响。无论哪种方式,它确实有所作为,所以我在我的个人解决方案中使用它。谢谢大家

这没有回答您的问题,但我很想知道它是否能加快速度。只返回字典而不是 DataFrame 应该更有效率:

from itertools import chain

def example_function(idx, string, comparisons, n):
    tup = process.extract(string, comparisons, limit = n)
    return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]

data = chain(*(example_function(index, value, s2, 2) for index, value in s1.items()))
df = pd.DataFrame.from_records(data)
print(df)

输出:

   idx  index2  score
0    0       0     86
1    0       3     43
2    1       1     80
3    1       0     45
4    2       2     76
5    2       1     32
6    3       3     76
7    3       1     53
8    4       0     30
9    4       3     29

更新:这是您问题的一个答案:

Dask 提供了一种非常方便的方法来并行执行 Python:

from dask.distributed import Client
from dask import delayed
from itertools import chain

def example_function(idx, string, comparisons, n):
    tup = process.extract(string, comparisons, limit = n)
    return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]

client = Client(n_workers=4)  # Choose number of cores
jobs = [delayed(example_function)(index, value, s2, 2) for index, value in s1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
client.close()

在大批量数据上试试这个,看看它是否加快了执行速度(我在这个小样本上试过,它并没有更快)。