在另一个数据框中查找一个数据框中出现的所有值的最佳方法是什么?
What is the best way to find all occurrences of values from one dataframe in another dataframe?
我正在处理一个 spark 集群,我有两个数据帧。一个包含文本。另一个是查找table。 table 都很大(M 和 N 都可以轻松超过 100,000 个条目)。最好的搭配方式是什么?
进行交叉连接然后根据匹配项过滤结果似乎是个疯狂的想法,因为我肯定会 运行 内存不足。
我的数据框看起来像这样:
df1:
text
0 i like apples
1 oranges are good
2 eating bananas is healthy
. ...
. ...
M tomatoes are red, bananas are yellow
df2:
fruit_lookup
0 apples
1 oranges
2 bananas
. ...
. ...
N tomatoes
我希望输出数据帧看起来像这样:
output_df:
text extracted_fruits
0 i like apples ['apples']
1 oranges are good ['oranges']
2 eating bananas is healthy ['bananas']
. ...
. ...
M tomatoes are red, bananas are yellow . ['tomatoes','bananas']
一种方法是使用 CountVectorizerModel 因为 100K 的查找词对于这个模型应该是可管理的(默认 vocabSize=262144 ):
基本思想是根据 df2
(查找 table)中的自定义列表创建 CountVectorizerModel。将 df1.text
拆分为数组列,然后将此列转换为 SparseVector,然后可以将其映射为单词:
编辑:在split函数中,将正则表达式从\s+
调整为[\s\p{Punct}]+
,以便所有标点符号标记被删除。如果查找不区分大小写,则将 'text'
更改为 lower(col('text'))
。
from pyspark.ml.feature import CountVectorizerModel
from pyspark.sql.functions import split, udf, regexp_replace, lower
df2.show()
+---+------------+
| id|fruit_lookup|
+---+------------+
| 0| apples|
| 1| oranges|
| 2| bananas|
| 3| tomatoes|
| 4|dragon fruit|
+---+------------+
Edit-2: 添加了以下 df1 预处理步骤并创建一个包含所有 N-gram 组合的数组列。对于每个包含 L
个单词的字符串,N=2 将在数组中添加 (L-1)
个项目,如果 N=3,则添加 (L-1)+(L-2)
个项目。
# max number of words in a single entry of the lookup table df2
N = 2
# Pre-process the `text` field up to N-grams,
# example: ngram_str('oranges are good', 3)
# --> ['oranges', 'are', 'good', 'oranges are', 'are good', 'oranges are good']
def ngram_str(s_t_r, N):
arr = s_t_r.split()
L = len(arr)
for i in range(2,N+1):
if L - i < 0: break
arr += [ ' '.join(arr[j:j+i]) for j in range(L-i+1) ]
return arr
udf_ngram_str = udf(lambda x: ngram_str(x, N), 'array<string>')
df1_processed = df1.withColumn('words_arr', udf_ngram_str(lower(regexp_replace('text', r'[\s\p{Punct}]+', ' '))))
在处理后的 df1 上实现模型:
lst = [ r.fruit_lookup for r in df2.collect() ]
model = CountVectorizerModel.from_vocabulary(lst, inputCol='words_arr', outputCol='fruits_vec')
df3 = model.transform(df1_processed)
df3.show(20,40)
#+----------------------------------------+----------------------------------------+-------------------+
#| text| words_arr| fruits_vec|
#+----------------------------------------+----------------------------------------+-------------------+
#| I like apples| [i, like, apples, i like, like apples]| (5,[0],[1.0])|
#| oranges are good|[oranges, are, good, oranges are, are...| (5,[1],[1.0])|
#| eating bananas is healthy|[eating, bananas, is, healthy, eating...| (5,[2],[1.0])|
#| tomatoes are red, bananas are yellow|[tomatoes, are, red, bananas, are, ye...|(5,[2,3],[1.0,1.0])|
#| test| [test]| (5,[],[])|
#|I have dragon fruit and apples in my bag|[i, have, dragon, fruit, and, apples,...|(5,[0,4],[1.0,1.0])|
#+----------------------------------------+----------------------------------------+-------------------+
然后您可以使用 model.vocabulary
将 fruits_vec 映射回水果
vocabulary = model.vocabulary
#['apples', 'oranges', 'bananas', 'tomatoes', 'dragon fruit']
to_match = udf(lambda v: [ vocabulary[i] for i in v.indices ], 'array<string>')
df_new = df3.withColumn('extracted_fruits', to_match('fruits_vec')).drop('words_arr', 'fruits_vec')
df_new.show(truncate=False)
#+----------------------------------------+----------------------+
#|text |extracted_fruits |
#+----------------------------------------+----------------------+
#|I like apples |[apples] |
#|oranges are good |[oranges] |
#|eating bananas is healthy |[bananas] |
#|tomatoes are red, bananas are yellow |[bananas, tomatoes] |
#|test |[] |
#|I have dragon fruit and apples in my bag|[apples, dragon fruit]|
#+----------------------------------------+----------------------+
Method-2: 由于您的数据集在 Spark 上下文方面并不庞大,因此以下可能有效,这将根据您的评论使用具有多个单词的查找值:
from pyspark.sql.functions import expr, collect_set
df1.alias('d1').join(
df2.alias('d2')
, expr('d1.text rlike concat("\\b", d2.fruit_lookup, "\\b")')
, 'left'
).groupby('text') \
.agg(collect_set('fruit_lookup').alias('extracted_fruits')) \
.show()
+--------------------+-------------------+
| text| extracted_fruits|
+--------------------+-------------------+
| oranges are good| [oranges]|
| I like apples| [apples]|
|tomatoes are red,...|[tomatoes, bananas]|
|eating bananas is...| [bananas]|
| test| []|
+--------------------+-------------------+
其中:"\\b
是单词边界,这样查找值就不会混淆它们的上下文。
注意:您可能需要在数据帧连接之前清除两列上的所有标点符号和冗余空格。
我正在处理一个 spark 集群,我有两个数据帧。一个包含文本。另一个是查找table。 table 都很大(M 和 N 都可以轻松超过 100,000 个条目)。最好的搭配方式是什么?
进行交叉连接然后根据匹配项过滤结果似乎是个疯狂的想法,因为我肯定会 运行 内存不足。
我的数据框看起来像这样:
df1:
text
0 i like apples
1 oranges are good
2 eating bananas is healthy
. ...
. ...
M tomatoes are red, bananas are yellow
df2:
fruit_lookup
0 apples
1 oranges
2 bananas
. ...
. ...
N tomatoes
我希望输出数据帧看起来像这样:
output_df:
text extracted_fruits
0 i like apples ['apples']
1 oranges are good ['oranges']
2 eating bananas is healthy ['bananas']
. ...
. ...
M tomatoes are red, bananas are yellow . ['tomatoes','bananas']
一种方法是使用 CountVectorizerModel 因为 100K 的查找词对于这个模型应该是可管理的(默认 vocabSize=262144 ):
基本思想是根据 df2
(查找 table)中的自定义列表创建 CountVectorizerModel。将 df1.text
拆分为数组列,然后将此列转换为 SparseVector,然后可以将其映射为单词:
编辑:在split函数中,将正则表达式从\s+
调整为[\s\p{Punct}]+
,以便所有标点符号标记被删除。如果查找不区分大小写,则将 'text'
更改为 lower(col('text'))
。
from pyspark.ml.feature import CountVectorizerModel
from pyspark.sql.functions import split, udf, regexp_replace, lower
df2.show()
+---+------------+
| id|fruit_lookup|
+---+------------+
| 0| apples|
| 1| oranges|
| 2| bananas|
| 3| tomatoes|
| 4|dragon fruit|
+---+------------+
Edit-2: 添加了以下 df1 预处理步骤并创建一个包含所有 N-gram 组合的数组列。对于每个包含 L
个单词的字符串,N=2 将在数组中添加 (L-1)
个项目,如果 N=3,则添加 (L-1)+(L-2)
个项目。
# max number of words in a single entry of the lookup table df2
N = 2
# Pre-process the `text` field up to N-grams,
# example: ngram_str('oranges are good', 3)
# --> ['oranges', 'are', 'good', 'oranges are', 'are good', 'oranges are good']
def ngram_str(s_t_r, N):
arr = s_t_r.split()
L = len(arr)
for i in range(2,N+1):
if L - i < 0: break
arr += [ ' '.join(arr[j:j+i]) for j in range(L-i+1) ]
return arr
udf_ngram_str = udf(lambda x: ngram_str(x, N), 'array<string>')
df1_processed = df1.withColumn('words_arr', udf_ngram_str(lower(regexp_replace('text', r'[\s\p{Punct}]+', ' '))))
在处理后的 df1 上实现模型:
lst = [ r.fruit_lookup for r in df2.collect() ]
model = CountVectorizerModel.from_vocabulary(lst, inputCol='words_arr', outputCol='fruits_vec')
df3 = model.transform(df1_processed)
df3.show(20,40)
#+----------------------------------------+----------------------------------------+-------------------+
#| text| words_arr| fruits_vec|
#+----------------------------------------+----------------------------------------+-------------------+
#| I like apples| [i, like, apples, i like, like apples]| (5,[0],[1.0])|
#| oranges are good|[oranges, are, good, oranges are, are...| (5,[1],[1.0])|
#| eating bananas is healthy|[eating, bananas, is, healthy, eating...| (5,[2],[1.0])|
#| tomatoes are red, bananas are yellow|[tomatoes, are, red, bananas, are, ye...|(5,[2,3],[1.0,1.0])|
#| test| [test]| (5,[],[])|
#|I have dragon fruit and apples in my bag|[i, have, dragon, fruit, and, apples,...|(5,[0,4],[1.0,1.0])|
#+----------------------------------------+----------------------------------------+-------------------+
然后您可以使用 model.vocabulary
vocabulary = model.vocabulary
#['apples', 'oranges', 'bananas', 'tomatoes', 'dragon fruit']
to_match = udf(lambda v: [ vocabulary[i] for i in v.indices ], 'array<string>')
df_new = df3.withColumn('extracted_fruits', to_match('fruits_vec')).drop('words_arr', 'fruits_vec')
df_new.show(truncate=False)
#+----------------------------------------+----------------------+
#|text |extracted_fruits |
#+----------------------------------------+----------------------+
#|I like apples |[apples] |
#|oranges are good |[oranges] |
#|eating bananas is healthy |[bananas] |
#|tomatoes are red, bananas are yellow |[bananas, tomatoes] |
#|test |[] |
#|I have dragon fruit and apples in my bag|[apples, dragon fruit]|
#+----------------------------------------+----------------------+
Method-2: 由于您的数据集在 Spark 上下文方面并不庞大,因此以下可能有效,这将根据您的评论使用具有多个单词的查找值:
from pyspark.sql.functions import expr, collect_set
df1.alias('d1').join(
df2.alias('d2')
, expr('d1.text rlike concat("\\b", d2.fruit_lookup, "\\b")')
, 'left'
).groupby('text') \
.agg(collect_set('fruit_lookup').alias('extracted_fruits')) \
.show()
+--------------------+-------------------+
| text| extracted_fruits|
+--------------------+-------------------+
| oranges are good| [oranges]|
| I like apples| [apples]|
|tomatoes are red,...|[tomatoes, bananas]|
|eating bananas is...| [bananas]|
| test| []|
+--------------------+-------------------+
其中:"\\b
是单词边界,这样查找值就不会混淆它们的上下文。
注意:您可能需要在数据帧连接之前清除两列上的所有标点符号和冗余空格。