Pyspark - 显示文档的前 10 个词
Pyspark - Display Top 10 words of document
我对 Pyspark 很陌生,使用以下代码对数据帧进行了 tfidf 处理
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import udf
wordsData = spark.createDataFrame([(0,"Hello","World","Spark","Is","Awesome","Hello","World"]),(1,["Hello","World","Spark","Is","Awesome","Hello","World"]),(2,["Hello","World"]),(3,["PYTHON", "Is", "Pretty", "Awesome"])],["label","words"])
#hashingTF way
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.show(truncate=False)
print(hashingTF.indexOf("PYTHON"))
现在我想将前 10 个单词及其 tfidf 值存储在单独的列中。但由于我并不真正习惯使用矢量,所以我对如何实现这一点有些困惑。我知道我需要以某种方式将 indexOf 函数应用于文档的每个标记以找到其值的映射,但我不知道该怎么做。据我了解,每个向量都是这样构建的:(Size,[Key],[Value])
我也在考虑使用 CounteVectorizer 方法(并为此使用它的词汇表),但我 运行 遇到了同样的问题。
谁能帮忙?
这是到目前为止的输出:
+-----+------------------------------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
|label|words |rawFeatures |features |
+-----+------------------------------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
|0 |[Hello, World, Spark, Is, Awesome, Hello, World]|(262144,[32755,44691,64441,179674,262052],[2.0,2.0,1.0,1.0,1.0])|(262144,[32755,44691,64441,179674,262052],[0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976])|
|1 |[Hello, World, Spark, Is, Awesome, Hello, World]|(262144,[32755,44691,64441,179674,262052],[2.0,2.0,1.0,1.0,1.0])|(262144,[32755,44691,64441,179674,262052],[0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976])|
|2 |[Hello, World] |(262144,[32755,44691],[1.0,1.0]) |(262144,[32755,44691],[0.22314355131420976,0.22314355131420976]) |
|3 |[PYTHON, Is, Pretty, Awesome] |(262144,[61511,64441,191247,262052],[1.0,1.0,1.0,1.0]) |(262144,[61511,64441,191247,262052],[0.9162907318741551,0.22314355131420976,0.9162907318741551,0.22314355131420976]) |
+-----+------------------------------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
在你的输出数据中,rawFeatures
和features
是稀疏向量,它有3个部分,size
,indices
,value
。
例如,(262144,[32755,44691,64441,179674,262052],[0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976])
这里
size = 262144 ,indices = [32755,44691,64441,179674,262052] , values = [0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976]
索引是映射到各个单词的哈希值的索引值。
来自 https://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf
Our implementation of term frequency utilizes the hashing trick. A raw feature is mapped into an index (term) by applying a hash function. Then term frequencies are calculated based on the mapped indices.
现在要读取输出向量并映射到单词,我们可以使用使用相同拟合模型的每个单词的哈希值并映射到 features
向量索引并获得相应的值。
1.first 我们得到每个单词的散列索引值。
ndf = wordsData.select('label',f.explode('words').name('expwords')).withColumn('words',f.array('expwords'))
hashudf = f.udf(lambda vector : vector.indices.tolist()[0],StringType())
wordtf = hashingTF.transform(ndf).withColumn('wordhash',hashudf(f.col('rawFeatures')))
wordtf.show()
+-----+--------+---------+--------------------+--------+
|label|expwords| words| rawFeatures|wordhash|
+-----+--------+---------+--------------------+--------+
| 0| Spark| [Spark]|(262144,[179674],...| 179674|
| 0| Awesome|[Awesome]|(262144,[262052],...| 262052|
| 0| World| [World]|(262144,[32755],[...| 32755|
| 0| Hello| [Hello]|(262144,[44691],[...| 44691|
| 0| Is| [Is]|(262144,[64441],[...| 64441|
| 2| Hello| [Hello]|(262144,[44691],[...| 44691|
| 2| World| [World]|(262144,[32755],[...| 32755|
| 1| Is| [Is]|(262144,[64441],[...| 64441|
| 1| World| [World]|(262144,[32755],[...| 32755|
| 1| Awesome|[Awesome]|(262144,[262052],...| 262052|
| 1| Spark| [Spark]|(262144,[179674],...| 179674|
| 1| Hello| [Hello]|(262144,[44691],[...| 44691|
| 3| Awesome|[Awesome]|(262144,[262052],...| 262052|
| 3| PYTHON| [PYTHON]|(262144,[191247],...| 191247|
| 3| Pretty| [Pretty]|(262144,[61511],[...| 61511|
| 3| Is| [Is]|(262144,[64441],[...| 64441|
+-----+--------+---------+--------------------+--------+
2.flatten 输出 features
列得到 indices
& value
.
udf1 = f.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = rescaledData.select('label',f.explode(udf1(f.col('features'))).name('wordhash','value'))
valuedf.show()
+-----+--------+-------------------+
|label|wordhash| value|
+-----+--------+-------------------+
| 0| 179674| 0.5108256237659907|
| 0| 64441|0.22314355131420976|
| 0| 44691|0.44628710262841953|
| 0| 32755|0.44628710262841953|
| 0| 262052|0.22314355131420976|
| 1| 179674| 0.5108256237659907|
| 1| 64441|0.22314355131420976|
| 1| 44691|0.44628710262841953|
| 1| 32755|0.44628710262841953|
| 1| 262052|0.22314355131420976|
| 2| 44691|0.22314355131420976|
| 2| 32755|0.22314355131420976|
| 3| 64441|0.22314355131420976|
| 3| 191247| 0.9162907318741551|
| 3| 262052|0.22314355131420976|
| 3| 61511| 0.9162907318741551|
+-----+--------+-------------------+
3.get 每个文档(标签)的前 n 个词根据其排名过滤并加入两个 DF 并收集和排序以获得词及其值。
w = Window.partitionBy('label').orderBy(f.desc('value'))
valuedf = valuedf.withColumn('rank',f.rank().over(w)).where(f.col('rank')<=3) # used 3 for testing.
valuedf.join(wordtf,['label','wordhash']).groupby('label').agg(f.sort_array(f.collect_list(f.struct(f.col('value'),f.col('expwords'))),asc=False).name('topn')).show(truncate=False)
+-----+-----------------------------------------------------------------------------------------------------------------------+
|label|topn |
+-----+-----------------------------------------------------------------------------------------------------------------------+
|0 |[{0.5108256237659907, Spark}, {0.44628710262841953, World}, {0.44628710262841953, Hello}] |
|1 |[{0.5108256237659907, Spark}, {0.44628710262841953, World}, {0.44628710262841953, Hello}] |
|3 |[{0.9162907318741551, Pretty}, {0.9162907318741551, PYTHON}, {0.22314355131420976, Is}, {0.22314355131420976, Awesome}]|
|2 |[{0.22314355131420976, World}, {0.22314355131420976, Hello}] |
+-----+-----------------------------------------------------------------------------------------------------------------------+
我对 Pyspark 很陌生,使用以下代码对数据帧进行了 tfidf 处理
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import udf
wordsData = spark.createDataFrame([(0,"Hello","World","Spark","Is","Awesome","Hello","World"]),(1,["Hello","World","Spark","Is","Awesome","Hello","World"]),(2,["Hello","World"]),(3,["PYTHON", "Is", "Pretty", "Awesome"])],["label","words"])
#hashingTF way
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.show(truncate=False)
print(hashingTF.indexOf("PYTHON"))
现在我想将前 10 个单词及其 tfidf 值存储在单独的列中。但由于我并不真正习惯使用矢量,所以我对如何实现这一点有些困惑。我知道我需要以某种方式将 indexOf 函数应用于文档的每个标记以找到其值的映射,但我不知道该怎么做。据我了解,每个向量都是这样构建的:(Size,[Key],[Value])
我也在考虑使用 CounteVectorizer 方法(并为此使用它的词汇表),但我 运行 遇到了同样的问题。
谁能帮忙? 这是到目前为止的输出:
+-----+------------------------------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
|label|words |rawFeatures |features |
+-----+------------------------------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
|0 |[Hello, World, Spark, Is, Awesome, Hello, World]|(262144,[32755,44691,64441,179674,262052],[2.0,2.0,1.0,1.0,1.0])|(262144,[32755,44691,64441,179674,262052],[0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976])|
|1 |[Hello, World, Spark, Is, Awesome, Hello, World]|(262144,[32755,44691,64441,179674,262052],[2.0,2.0,1.0,1.0,1.0])|(262144,[32755,44691,64441,179674,262052],[0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976])|
|2 |[Hello, World] |(262144,[32755,44691],[1.0,1.0]) |(262144,[32755,44691],[0.22314355131420976,0.22314355131420976]) |
|3 |[PYTHON, Is, Pretty, Awesome] |(262144,[61511,64441,191247,262052],[1.0,1.0,1.0,1.0]) |(262144,[61511,64441,191247,262052],[0.9162907318741551,0.22314355131420976,0.9162907318741551,0.22314355131420976]) |
+-----+------------------------------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
在你的输出数据中,rawFeatures
和features
是稀疏向量,它有3个部分,size
,indices
,value
。
例如,(262144,[32755,44691,64441,179674,262052],[0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976])
这里
size = 262144 ,indices = [32755,44691,64441,179674,262052] , values = [0.44628710262841953,0.44628710262841953,0.22314355131420976,0.5108256237659907,0.22314355131420976]
索引是映射到各个单词的哈希值的索引值。
来自 https://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf
Our implementation of term frequency utilizes the hashing trick. A raw feature is mapped into an index (term) by applying a hash function. Then term frequencies are calculated based on the mapped indices.
现在要读取输出向量并映射到单词,我们可以使用使用相同拟合模型的每个单词的哈希值并映射到 features
向量索引并获得相应的值。
1.first 我们得到每个单词的散列索引值。
ndf = wordsData.select('label',f.explode('words').name('expwords')).withColumn('words',f.array('expwords'))
hashudf = f.udf(lambda vector : vector.indices.tolist()[0],StringType())
wordtf = hashingTF.transform(ndf).withColumn('wordhash',hashudf(f.col('rawFeatures')))
wordtf.show()
+-----+--------+---------+--------------------+--------+
|label|expwords| words| rawFeatures|wordhash|
+-----+--------+---------+--------------------+--------+
| 0| Spark| [Spark]|(262144,[179674],...| 179674|
| 0| Awesome|[Awesome]|(262144,[262052],...| 262052|
| 0| World| [World]|(262144,[32755],[...| 32755|
| 0| Hello| [Hello]|(262144,[44691],[...| 44691|
| 0| Is| [Is]|(262144,[64441],[...| 64441|
| 2| Hello| [Hello]|(262144,[44691],[...| 44691|
| 2| World| [World]|(262144,[32755],[...| 32755|
| 1| Is| [Is]|(262144,[64441],[...| 64441|
| 1| World| [World]|(262144,[32755],[...| 32755|
| 1| Awesome|[Awesome]|(262144,[262052],...| 262052|
| 1| Spark| [Spark]|(262144,[179674],...| 179674|
| 1| Hello| [Hello]|(262144,[44691],[...| 44691|
| 3| Awesome|[Awesome]|(262144,[262052],...| 262052|
| 3| PYTHON| [PYTHON]|(262144,[191247],...| 191247|
| 3| Pretty| [Pretty]|(262144,[61511],[...| 61511|
| 3| Is| [Is]|(262144,[64441],[...| 64441|
+-----+--------+---------+--------------------+--------+
2.flatten 输出 features
列得到 indices
& value
.
udf1 = f.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = rescaledData.select('label',f.explode(udf1(f.col('features'))).name('wordhash','value'))
valuedf.show()
+-----+--------+-------------------+
|label|wordhash| value|
+-----+--------+-------------------+
| 0| 179674| 0.5108256237659907|
| 0| 64441|0.22314355131420976|
| 0| 44691|0.44628710262841953|
| 0| 32755|0.44628710262841953|
| 0| 262052|0.22314355131420976|
| 1| 179674| 0.5108256237659907|
| 1| 64441|0.22314355131420976|
| 1| 44691|0.44628710262841953|
| 1| 32755|0.44628710262841953|
| 1| 262052|0.22314355131420976|
| 2| 44691|0.22314355131420976|
| 2| 32755|0.22314355131420976|
| 3| 64441|0.22314355131420976|
| 3| 191247| 0.9162907318741551|
| 3| 262052|0.22314355131420976|
| 3| 61511| 0.9162907318741551|
+-----+--------+-------------------+
3.get 每个文档(标签)的前 n 个词根据其排名过滤并加入两个 DF 并收集和排序以获得词及其值。
w = Window.partitionBy('label').orderBy(f.desc('value'))
valuedf = valuedf.withColumn('rank',f.rank().over(w)).where(f.col('rank')<=3) # used 3 for testing.
valuedf.join(wordtf,['label','wordhash']).groupby('label').agg(f.sort_array(f.collect_list(f.struct(f.col('value'),f.col('expwords'))),asc=False).name('topn')).show(truncate=False)
+-----+-----------------------------------------------------------------------------------------------------------------------+
|label|topn |
+-----+-----------------------------------------------------------------------------------------------------------------------+
|0 |[{0.5108256237659907, Spark}, {0.44628710262841953, World}, {0.44628710262841953, Hello}] |
|1 |[{0.5108256237659907, Spark}, {0.44628710262841953, World}, {0.44628710262841953, Hello}] |
|3 |[{0.9162907318741551, Pretty}, {0.9162907318741551, PYTHON}, {0.22314355131420976, Is}, {0.22314355131420976, Awesome}]|
|2 |[{0.22314355131420976, World}, {0.22314355131420976, Hello}] |
+-----+-----------------------------------------------------------------------------------------------------------------------+