将生成的 TFIDF 计算添加到 Pyspark 中原始文档的数据框中

Adding the resulting TFIDF calculation to the dataframe of the original documents in Pyspark

我正在使用 Spark MLlib 计算每个文档的所有项的 TFIDF 的总和(每个文档由一行数据框描述), 我写了下面的代码:

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.linalg import SparseVector

sc = SparkContext() 
sqlContext = SQLContext(sc)

#SECTION 1
documents = sqlContext.createDataFrame([
    (0, "hello spark", "data1"),
    (1, "this is example", "data2"),
    (2, "spark is fast","data3"),
    (3, "hello world","data4")], ["doc_id", "doc_text", "another"])

#SECTION 2
documents.registerTempTable("doc_table")
textcolumn= sqlContext.sql("SELECT doc_text FROM doc_table")
doc_words= textcolumn.map(lambda d: d.doc_text).map(lambda t: t.split(" "))

#SECTION 3
hashingTF = HashingTF()
tf = hashingTF.transform(doc_words).cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf).cache()

#SECTION 4
sumrdd=tfidf.map(lambda v: v.values.sum())
print('\n Summation of TFIDF for each document:')
print(sumrdd.collect())

我得到以下结果:

[1.0216512475319814, 2.3434070875143007, 1.9379419794061366, 1.4271163556401458]

有人帮我对代码做了一些修改,以保留原始数据(doc_id、doc_text、另一个)在tf、idf和tfidf的计算过程中被链接因为我在数据框中有数千行,所以我必须确保每个文档都正确连接到它的总和 TFIDF。 最后我想得到这样的结果(或数据框):

(0, "hello spark", "data1", 1.0216512475319814)
(1, "this is example", "data2", 2.3434070875143007)
(2, "spark is fast","data3",1.9379419794061366)
(3, "hello world","data4",1.4271163556401458)

解决方案 1:

这不是最好的解决方案,但主要思想实际上是使用 spark-ml 来保存您的 DataFrame 的信息:

# I used alias to avoid confusion with the mllib library
from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.ml.feature import IDF as MLIDF
from pyspark.sql.types import DoubleType

documents = sqlContext.createDataFrame([
    (0, "hello spark", "data1"),
    (1, "this is example", "data2"),
    (2, "spark is fast","data3"),
    (3, "hello world","data4")], ["doc_id", "doc_text", "another"])

documents.printSchema()
# root
# |-- doc_id: long (nullable = true)
# |-- doc_text: string (nullable = true)
# |-- another: string (nullable = true)

df = (documents
  .rdd
  .map(lambda x : (x.doc_id,x.doc_text.split(" ")))
  .toDF()
  .withColumnRenamed("_1","doc_id")
  .withColumnRenamed("_2","features"))

htf = MLHashingTF(inputCol="features", outputCol="tf")
tf = htf.transform(df)
tf.show(truncate=False)
# +------+-------------------+------------------------------------------+
# |doc_id|features           |tf                                        |
# +------+-------------------+------------------------------------------+
# |0     |[hello, spark]     |(262144,[62173,71890],[1.0,1.0])          |
# |1     |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])|
# |2     |[spark, is, fast]  |(262144,[3370,62173,251996],[1.0,1.0,1.0])|
# |3     |[hello, world]     |(262144,[71890,72594],[1.0,1.0])          |
# +------+-------------------+------------------------------------------+

idf = MLIDF(inputCol="tf", outputCol="idf")
tfidf = idf.fit(tf).transform(tf)
tfidf.show(truncate=False)
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
# |doc_id|features           |tf                                        |idf                                                                                    |
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
# |0     |[hello, spark]     |(262144,[62173,71890],[1.0,1.0])          |(262144,[62173,71890],[0.5108256237659907,0.5108256237659907])                         |
# |1     |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])|(262144,[3370,69994,151198],[0.5108256237659907,0.9162907318741551,0.9162907318741551])|
# |2     |[spark, is, fast]  |(262144,[3370,62173,251996],[1.0,1.0,1.0])|(262144,[3370,62173,251996],[0.5108256237659907,0.5108256237659907,0.9162907318741551])|
# |3     |[hello, world]     |(262144,[71890,72594],[1.0,1.0])          |(262144,[71890,72594],[0.5108256237659907,0.9162907318741551])                         |
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+

res = tfidf.rdd.map(lambda x : (x.doc_id,x.features,x.tf,x.idf,(None if x.idf is None else x.idf.values.sum())))

for r in res.take(10):
    print r

# (0, [u'hello', u'spark'], SparseVector(262144, {62173: 1.0, 71890: 1.0}), SparseVector(262144, {62173: 0.5108, 71890: 0.5108}), 1.0216512475319814)
# (1, [u'this', u'is', u'example'], SparseVector(262144, {3370: 1.0, 69994: 1.0, 151198: 1.0}), SparseVector(262144, {3370: 0.5108, 69994: 0.9163, 151198: 0.9163}), 2.3434070875143007)
# (2, [u'spark', u'is', u'fast'], SparseVector(262144, {3370: 1.0, 62173: 1.0, 251996: 1.0}), SparseVector(262144, {3370: 0.5108, 62173: 0.5108, 251996: 0.9163}), 1.9379419794061366)
# (3, [u'hello', u'world'], SparseVector(262144, {71890: 1.0, 72594: 1.0}), SparseVector(262144, {71890: 0.5108, 72594: 0.9163}), 1.4271163556401458)

方案二:

您可以考虑使用 UDF:

from pyspark.sql.functions import udf

sum_ = udf(lambda v: float(v.values.sum()), DoubleType())
tfidf.withColumn("idf_sum", sum_("idf")).show()

## +------+-------------------+--------------------+--------------------+------------------+
## |doc_id|           features|                  tf|                 idf|           idf_sum|
## +------+-------------------+--------------------+--------------------+------------------+
## |     0|     [hello, spark]|(262144,[62173,71...|(262144,[62173,71...|1.0216512475319814|
## |     1|[this, is, example]|(262144,[3370,699...|(262144,[3370,699...|2.3434070875143007|
## |     2|  [spark, is, fast]|(262144,[3370,621...|(262144,[3370,621...|1.9379419794061366|
## |     3|     [hello, world]|(262144,[71890,72...|(262144,[71890,72...|1.4271163556401458|
## +------+-------------------+--------------------+--------------------+------------------+