Spark ML CountVectorizer 输出的说明

Explanation of Spark ML CountVectorizer output

请帮助理解 Spark ML 的输出 CountVectorizer 并建议解释它的文档。

val cv = new CountVectorizer()
  .setInputCol("Tokens")
  .setOutputCol("Frequencies")
  .setVocabSize(5000)
  .setMinTF(1)
  .setMinDF(2)
val fittedCV = cv.fit(tokenDF.select("Tokens"))
fittedCV.transform(tokenDF.select("Tokens")).show(false)

2374 应该是词典中的术语(单词)数。 什么是“[2,6,328,548,1234]”?

它们是字典中单词“[airline, bag, vintage, world, champion]”的索引吗?如果是这样,为什么同一个词 "airline" 在第二行有不同的索引“0”?

+------------------------------------------+----------------------------------------------------------------+
|Tokens                                    |Frequencies                                                     |
+------------------------------------------+----------------------------------------------------------------+
...
|[airline, bag, vintage, world, champion]  |(2374,[2,6,328,548,1234],[1.0,1.0,1.0,1.0,1.0])                 |
|[airline, bag, vintage, jet, set, brown]  |(2374,[0,2,6,328,405,620],[1.0,1.0,1.0,1.0,1.0,1.0])            |
+------------------------------------------+----------------------------------------------------------------+


  [1]: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.CountVectorizer

有一些 doc 解释了基础知识。然而,这很简单。

是的。数字代表词汇索引中的单词。然而,频率向量中的顺序与标记向量中的顺序不对应。 airline, bag, vintage 在两行中,因此它们对应于索引 [2,6,328]。但是您不能依赖相同的顺序。

行数据类型是 SparseVector。第一个数组显示索引,第二个数组显示值。

例如

vector[328] 
   => 1.0

映射如下:

vocabulary
airline 328
bag 6
vintage 2

Frequencies
2734, [2, 6 ,328], [99, 5, 7]

# counts
vintage x 99
bag x 5
airline 7

为了找回单词,您可以在词汇表中进行查找。这需要广播给不同的工人。您很可能还想将每个文档的计数分解成单独的行。

这是一些 python 代码片段,用于使用 udf 将每个文档的前 25 个常用词提取到单独的行中,并计算每个词的平均值

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import Row

vocabulary = sc.broadcast(fittedCV.vocabulary)

def _top_scores(v):
    # create count tuples for each index(i) in a vector(v)
    # `.item()` is used, because in python the count value is a numpy datatype, in `scala` it will be just double 

    counts = [Row(i=i.item(),count=v[i.item()].item()) for i in v.indices]
    # => [Row(i=2, count=30, Row(i=362, count=40)]

    # return 25 top count rows
    counts = sorted(counts, reverse=True, key=lambda x: x.count)
    return counts[:25]

top_scores = F.udf(_top_scores, T.ArrayType(T.StructType().add('i', T.IntegerType()).add('count', T.DoubleType())))                  
vec_to_word = F.udf(_vecToWord, T.StringType())


def _vecToWord(i):
    return vocabulary.value[i]



res = df.withColumn('word_count', explode(top_scores('Frequencies')))
=>
+-----+-----+----------+ 
doc_id, ..., word_count
             (i,  count)
+-----+-----+----------+
4711, ...,   (2, 30.0)
4711, ...,   (362, 40.0)
+-----+-----+----------+

res = res \
    .groupBy('word_count.i').agg( \
        avg('word_count.count').alias('mean')
    .orderBy('mean', ascending=False)

res = res.withColumn('token', vec_to_word('i')) 


=>
+---+---------+----------+ 
 i,   token,    mean
+---+---------+----------+ 
 2,   vintage,  15
 328, airline,  30  

+--+----------+----------+