Pyspark - 显示文档的前 10 个词

Pyspark - Display Top 10 words of document

我对 Pyspark 很陌生,使用以下代码对数据帧进行了 tfidf 处理

from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import udf
wordsData = spark.createDataFrame([(0,"Hello","World","Spark","Is","Awesome","Hello","World"]),(1,["Hello","World","Spark","Is","Awesome","Hello","World"]),(2,["Hello","World"]),(3,["PYTHON", "Is", "Pretty", "Awesome"])],["label","words"])

#hashingTF way
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.show(truncate=False)
print(hashingTF.indexOf("PYTHON"))

现在我想将前 10 个单词及其 tfidf 值存储在单独的列中。但由于我并不真正习惯使用矢量,所以我对如何实现这一点有些困惑。我知道我需要以某种方式将 indexOf 函数应用于文档的每个标记以找到其值的映射,但我不知道该怎么做。据我了解,每个向量都是这样构建的:(Size,[Key],[Value])

我也在考虑使用 CounteVectorizer 方法(并为此使用它的词汇表),但我 运行 遇到了同样的问题。

谁能帮忙? 这是到目前为止的输出:

+-----+------------------------------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
|label|words                                           |rawFeatures                                                     |features                                                                                                                                       |
+-----+------------------------------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
|0    |[Hello, World, Spark, Is, Awesome, Hello, World]|(262144,[32755,44691,64441,179674,262052],[2.0,2.0,1.0,1.0,1.0])|(262144,[32755,44691,64441,179674,262052],[0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976])|
|1    |[Hello, World, Spark, Is, Awesome, Hello, World]|(262144,[32755,44691,64441,179674,262052],[2.0,2.0,1.0,1.0,1.0])|(262144,[32755,44691,64441,179674,262052],[0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976])|
|2    |[Hello, World]                                  |(262144,[32755,44691],[1.0,1.0])                                |(262144,[32755,44691],[0.22314355131420976,0.22314355131420976])                                                                               |
|3    |[PYTHON, Is, Pretty, Awesome]                   |(262144,[61511,64441,191247,262052],[1.0,1.0,1.0,1.0])          |(262144,[61511,64441,191247,262052],[0.9162907318741551,0.22314355131420976,0.9162907318741551,0.22314355131420976])                           |
+-----+------------------------------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+

在你的输出数据中,rawFeaturesfeatures是稀疏向量,它有3个部分,sizeindicesvalue

例如,(262144,[32755,44691,64441,179674,262052],[0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976])

这里 size = 262144 ,indices = [32755,44691,64441,179674,262052] , values = [0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976]

索引是映射到各个单词的哈希值的索引值。 来自 https://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf
Our implementation of term frequency utilizes the hashing trick. A raw feature is mapped into an index (term) by applying a hash function. Then term frequencies are calculated based on the mapped indices.

现在要读取输出向量并映射到单词,我们可以使用使用相同拟合模型的每个单词的哈希值并映射到 features 向量索引并获得相应的值。

1.first 我们得到每个单词的散列索引值。

ndf = wordsData.select('label',f.explode('words').name('expwords')).withColumn('words',f.array('expwords'))
hashudf = f.udf(lambda vector : vector.indices.tolist()[0],StringType())
wordtf = hashingTF.transform(ndf).withColumn('wordhash',hashudf(f.col('rawFeatures')))
wordtf.show()
+-----+--------+---------+--------------------+--------+
|label|expwords|    words|         rawFeatures|wordhash|
+-----+--------+---------+--------------------+--------+
|    0|   Spark|  [Spark]|(262144,[179674],...|  179674|
|    0| Awesome|[Awesome]|(262144,[262052],...|  262052|
|    0|   World|  [World]|(262144,[32755],[...|   32755|
|    0|   Hello|  [Hello]|(262144,[44691],[...|   44691|
|    0|      Is|     [Is]|(262144,[64441],[...|   64441|
|    2|   Hello|  [Hello]|(262144,[44691],[...|   44691|
|    2|   World|  [World]|(262144,[32755],[...|   32755|
|    1|      Is|     [Is]|(262144,[64441],[...|   64441|
|    1|   World|  [World]|(262144,[32755],[...|   32755|
|    1| Awesome|[Awesome]|(262144,[262052],...|  262052|
|    1|   Spark|  [Spark]|(262144,[179674],...|  179674|
|    1|   Hello|  [Hello]|(262144,[44691],[...|   44691|
|    3| Awesome|[Awesome]|(262144,[262052],...|  262052|
|    3|  PYTHON| [PYTHON]|(262144,[191247],...|  191247|
|    3|  Pretty| [Pretty]|(262144,[61511],[...|   61511|
|    3|      Is|     [Is]|(262144,[64441],[...|   64441|
+-----+--------+---------+--------------------+--------+

2.flatten 输出 features 列得到 indices & value.

udf1 = f.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = rescaledData.select('label',f.explode(udf1(f.col('features'))).name('wordhash','value'))
valuedf.show()
+-----+--------+-------------------+
|label|wordhash|              value|
+-----+--------+-------------------+
|    0|  179674| 0.5108256237659907|
|    0|   64441|0.22314355131420976|
|    0|   44691|0.44628710262841953|
|    0|   32755|0.44628710262841953|
|    0|  262052|0.22314355131420976|
|    1|  179674| 0.5108256237659907|
|    1|   64441|0.22314355131420976|
|    1|   44691|0.44628710262841953|
|    1|   32755|0.44628710262841953|
|    1|  262052|0.22314355131420976|
|    2|   44691|0.22314355131420976|
|    2|   32755|0.22314355131420976|
|    3|   64441|0.22314355131420976|
|    3|  191247| 0.9162907318741551|
|    3|  262052|0.22314355131420976|
|    3|   61511| 0.9162907318741551|
+-----+--------+-------------------+

3.get 每个文档(标签)的前 n 个词根据其排名过滤并加入两个 DF 并收集和排序以获得词及其值。

w = Window.partitionBy('label').orderBy(f.desc('value'))
valuedf = valuedf.withColumn('rank',f.rank().over(w)).where(f.col('rank')<=3) # used 3 for testing.
valuedf.join(wordtf,['label','wordhash']).groupby('label').agg(f.sort_array(f.collect_list(f.struct(f.col('value'),f.col('expwords'))),asc=False).name('topn')).show(truncate=False)
+-----+-----------------------------------------------------------------------------------------------------------------------+
|label|topn                                                                                                                   |
+-----+-----------------------------------------------------------------------------------------------------------------------+
|0    |[{0.5108256237659907, Spark}, {0.44628710262841953, World}, {0.44628710262841953, Hello}]                              |
|1    |[{0.5108256237659907, Spark}, {0.44628710262841953, World}, {0.44628710262841953, Hello}]                              |
|3    |[{0.9162907318741551, Pretty}, {0.9162907318741551, PYTHON}, {0.22314355131420976, Is}, {0.22314355131420976, Awesome}]|
|2    |[{0.22314355131420976, World}, {0.22314355131420976, Hello}]                                                           |
+-----+-----------------------------------------------------------------------------------------------------------------------+