从 TF-IDF 到 spark、pyspark 中的 LDA 聚类

From TF-IDF to LDA clustering in spark, pyspark

我正在尝试对存储在 format key,listofwords

中的推文进行聚类

我的第一步是使用带有

的数据框为单词列表提取 TF-IDF 值
dbURL = "hdfs://pathtodir"  
file = sc.textFile(dbURL)
#Define data frame schema
fields = [StructField('key',StringType(),False),StructField('content',StringType(),False)]
schema = StructType(fields)
#Data in format <key>,<listofwords>
file_temp = file.map(lambda l : l.split(","))
file_df = sqlContext.createDataFrame(file_temp, schema)
#Extract TF-IDF From https://spark.apache.org/docs/1.5.2/ml-features.html
tokenizer = Tokenizer(inputCol='content', outputCol='words')
wordsData = tokenizer.transform(file_df)
hashingTF = HashingTF(inputCol='words',outputCol='rawFeatures',numFeatures=1000)
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol='rawFeatures',outputCol='features')
idfModel = idf.fit(featurizedData)
rescaled_data = idfModel.transform(featurizedData)

根据 I tried to reformat the output to what I expect to be an input to LDA, based on this example 的建议,我开始了:

indexer = StringIndexer(inputCol='key',outputCol='KeyIndex')
indexed_data = indexer.fit(rescaled_data).transform(rescaled_data).drop('key').drop('content').drop('words').drop('rawFeatures')

但现在我没能找到一种好方法将我的数据框转换为上一个示例或 this example

中建议的格式

如果有人能指出我要查看的正确位置,或者如果我的方法有误,可以纠正我,我将不胜感激。

我认为从一系列文档中提取 TF-IDS 向量并将它们聚类应该是一件相当经典的事情,但我没能找到一种简单的方法来做到这一点。

LDA 期望将 (id, features) 作为输入,因此假设 KeyIndex 作为 ID:

from pyspark.mllib.clustering import LDA

k = ... # number of clusters
corpus = indexed_data.select(col("KeyIndex").cast("long"), "features").map(list)
model = LDA.train(corpus, k=k)

LDA 不将 TF-IDF 矩阵作为输入。相反,它只接受 TF 矩阵。例如:

from pyspark.ml.feature import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer 
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import LDA

tokenizer = Tokenizer(inputCol="hashTagDocument", outputCol="words")

stopWordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered", 
stopWords=stopwords)

vectorizer = CountVectorizer(inputCol="filtered", outputCol="features", 
vocabSize=40000, minDF=5) 

pipeline = Pipeline(stages=[tokenizer, stopWordsRemover, vectorizer, lda])
pipelineModel = pipeline.fit(corpus)

pipelineModel.stages