在 pd.concat 中并行化 for 循环
Parallelize for loop in pd.concat
我需要根据不完全匹配的字符串列合并两个大型数据集。我有广泛的数据集,可以帮助我比单独的字符串距离更准确地确定最佳匹配,但我首先需要 return 几个 'top matches' 每个字符串。
可重现的例子:
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
df2_index = [i[2] for i in tup]
scores = [i[1] for i in tup]
return pd.DataFrame({
"df1_index": [idx] * n,
"df2_index": df2_index,
"score": scores
})
import pandas as pd
from fuzzywuzzy import process
s1 = pd.Series(["two apples", "one orange", "my banana", "red grape", "huge kiwi"])
s2 = pd.Series(["a couple of apples", "old orange", "your bananas", "purple grape", "tropical fruit"])
pd.concat([example_function(index, value, s2, 2) for index, value in s1.items()]).reset_index()
我未能成功并行化此函数。似乎最接近我正在尝试做的是 multiprocessing implementation, but even with starmap 我没有得到结果。我想有一种简单的方法可以实现这一点,但还没有找到有效的方法。
我愿意接受有关如何优化我的代码的任何建议,但并行处理在这种情况下是一个合适的解决方案,因为它看起来需要大约 4-5 个小时(事后看来,这是一个慷慨的估计) 如果按顺序完成。
更新
感谢您提供解决方案。我有一个 7,000 行的 df1 和一个 70,000 行的 df2。对于下面的结果,我从 df2 中搜索了所有 70,000 行,以查找 df1 的前 20 行中的每一行。
- dataframe concat 方法(原始方法):96 秒
- 字典链法:90秒
- 与 dask 并行添加(4 名工人):77 秒
- 使用 rapidfuzz 而不是 fuzzywuzzy:6.73 秒
- 在 dask 中使用 rapidfuzz(4 名工人):5.29 秒
优化代码如下:
from dask.distributed import Client
from dask import delayed
from rapidfuzz import process, fuzz
from itertools import chain
client = Client(n_workers = 4, processes = False)
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, scorer = fuzz.WRatio, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
jobs = [delayed(example_function)(index, value, t3, 20) for index, value in t1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
print(df)
client.close()
并行处理并没有达到我预期的效果。也许它没有针对此功能进行理想设置,或者随着我包含更多迭代,它可能会继续扩展以产生更大的影响。无论哪种方式,它确实有所作为,所以我在我的个人解决方案中使用它。谢谢大家
这没有回答您的问题,但我很想知道它是否能加快速度。只返回字典而不是 DataFrame 应该更有效率:
from itertools import chain
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
data = chain(*(example_function(index, value, s2, 2) for index, value in s1.items()))
df = pd.DataFrame.from_records(data)
print(df)
输出:
idx index2 score
0 0 0 86
1 0 3 43
2 1 1 80
3 1 0 45
4 2 2 76
5 2 1 32
6 3 3 76
7 3 1 53
8 4 0 30
9 4 3 29
更新:这是您问题的一个答案:
Dask 提供了一种非常方便的方法来并行执行 Python:
from dask.distributed import Client
from dask import delayed
from itertools import chain
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
client = Client(n_workers=4) # Choose number of cores
jobs = [delayed(example_function)(index, value, s2, 2) for index, value in s1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
client.close()
在大批量数据上试试这个,看看它是否加快了执行速度(我在这个小样本上试过,它并没有更快)。
我需要根据不完全匹配的字符串列合并两个大型数据集。我有广泛的数据集,可以帮助我比单独的字符串距离更准确地确定最佳匹配,但我首先需要 return 几个 'top matches' 每个字符串。
可重现的例子:
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
df2_index = [i[2] for i in tup]
scores = [i[1] for i in tup]
return pd.DataFrame({
"df1_index": [idx] * n,
"df2_index": df2_index,
"score": scores
})
import pandas as pd
from fuzzywuzzy import process
s1 = pd.Series(["two apples", "one orange", "my banana", "red grape", "huge kiwi"])
s2 = pd.Series(["a couple of apples", "old orange", "your bananas", "purple grape", "tropical fruit"])
pd.concat([example_function(index, value, s2, 2) for index, value in s1.items()]).reset_index()
我未能成功并行化此函数。似乎最接近我正在尝试做的是 multiprocessing implementation, but even with starmap 我没有得到结果。我想有一种简单的方法可以实现这一点,但还没有找到有效的方法。
我愿意接受有关如何优化我的代码的任何建议,但并行处理在这种情况下是一个合适的解决方案,因为它看起来需要大约 4-5 个小时(事后看来,这是一个慷慨的估计) 如果按顺序完成。
更新
感谢您提供解决方案。我有一个 7,000 行的 df1 和一个 70,000 行的 df2。对于下面的结果,我从 df2 中搜索了所有 70,000 行,以查找 df1 的前 20 行中的每一行。
- dataframe concat 方法(原始方法):96 秒
- 字典链法:90秒
- 与 dask 并行添加(4 名工人):77 秒
- 使用 rapidfuzz 而不是 fuzzywuzzy:6.73 秒
- 在 dask 中使用 rapidfuzz(4 名工人):5.29 秒
优化代码如下:
from dask.distributed import Client
from dask import delayed
from rapidfuzz import process, fuzz
from itertools import chain
client = Client(n_workers = 4, processes = False)
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, scorer = fuzz.WRatio, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
jobs = [delayed(example_function)(index, value, t3, 20) for index, value in t1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
print(df)
client.close()
并行处理并没有达到我预期的效果。也许它没有针对此功能进行理想设置,或者随着我包含更多迭代,它可能会继续扩展以产生更大的影响。无论哪种方式,它确实有所作为,所以我在我的个人解决方案中使用它。谢谢大家
这没有回答您的问题,但我很想知道它是否能加快速度。只返回字典而不是 DataFrame 应该更有效率:
from itertools import chain
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
data = chain(*(example_function(index, value, s2, 2) for index, value in s1.items()))
df = pd.DataFrame.from_records(data)
print(df)
输出:
idx index2 score
0 0 0 86
1 0 3 43
2 1 1 80
3 1 0 45
4 2 2 76
5 2 1 32
6 3 3 76
7 3 1 53
8 4 0 30
9 4 3 29
更新:这是您问题的一个答案:
Dask 提供了一种非常方便的方法来并行执行 Python:
from dask.distributed import Client
from dask import delayed
from itertools import chain
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
client = Client(n_workers=4) # Choose number of cores
jobs = [delayed(example_function)(index, value, s2, 2) for index, value in s1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
client.close()
在大批量数据上试试这个,看看它是否加快了执行速度(我在这个小样本上试过,它并没有更快)。