如何使用 Pandas 中的应用并行化许多(模糊的)字符串比较?

how to parallelize many (fuzzy) string comparisons using apply in Pandas?

我有以下问题

我有一个包含句子的数据框 master,例如

master
Out[8]: 
                  original
0  this is a nice sentence
1      this is another one
2    Whosebug is nice

对于 Master 中的每一行,我使用 fuzzywuzzy 查找另一个 Dataframe slave 以获得最佳匹配。我使用 fuzzywuzzy 因为两个数据帧之间的匹配句子可能会有所不同(额外的字符等)。

例如,slave可以是

slave
Out[10]: 
   my_value                      name
0         2               hello world
1         1           congratulations
2         2  this is a nice sentence 
3         3       this is another one
4         1     Whosebug is nice

这是一个功能齐全、精彩、紧凑的工作示例:)

from fuzzywuzzy import fuzz
import pandas as pd
import numpy as np
import difflib


master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'Whosebug is nice']})


slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'Whosebug is nice'],'my_value': [2,1,2,3,1]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    #use fuzzywuzzy to see how close original and name are
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.ix[slave_df.score.idxmax(),'my_value']

master['my_value'] = master.original.apply(lambda x: helper(x,slave))

100 万美元的问题是:我可以并行化上面的应用代码吗?

毕竟,master 中的每一行都与 slave 中的所有行进行比较(slave 是一个小数据集,我可以将数据的许多副本保存到 RAM 中)。

我不明白为什么我不能 运行 多重比较(即同时处理多行)。

问题:我不知道该怎么做,甚至不知道该怎么做。

非常感谢任何帮助!

您可以将其与 Dask.dataframe 并行化。

>>> dmaster = dd.from_pandas(master, npartitions=4)
>>> dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave), name='my_value'))
>>> dmaster.compute()
                  original  my_value
0  this is a nice sentence         2
1      this is another one         3
2    Whosebug is nice         1

此外,您应该在此处考虑使用线程与进程之间的权衡。您的模糊字符串匹配几乎肯定不会释放 GIL,因此您不会从使用多线程中获得任何好处。但是,使用进程会导致数据序列化并在您的机器上移动,这可能会减慢速度。

您可以通过管理 compute() 方法的 get= 关键字参数来试验使用线程和进程还是分布式系统。

import dask.multiprocessing
import dask.threaded

>>> dmaster.compute(get=dask.threaded.get)  # this is default for dask.dataframe
>>> dmaster.compute(get=dask.multiprocessing.get)  # try processes instead

我正在做类似的事情,我想为您可能偶然发现这个问题的其他人提供更完整的工作解决方案。不幸的是,@MRocklin 在提供的代码片段中有一些语法错误。我不是 Dask 专家,所以我不能对某些性能考虑因素发表评论,但这应该可以完成您的任务,就像@MRocklin 所建议的那样。这是使用 Dask 版本 0.17.2Pandas 版本 0.22.0:

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'Whosebug is nice']})

slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'Whosebug is nice'],'my_value': [1,2,3,4,5]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))

然后,获取结果(就像在这个解释器会话中一样):

In [6]: dmaster.compute(get=dask.multiprocessing.get)                                             
Out[6]:                                          
                  original  my_value             
0  this is a nice sentence         3             
1      this is another one         4             
2    Whosebug is nice         5    

这些答案基于较旧的 API。一些较新的代码:

dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))
dmaster.compute(scheduler='processes') 

就我个人而言,我会放弃在辅助函数中对 fuzzy_score 的应用调用,而只是在那里执行操作。

您可以使用 these tips 更改调度程序。