在 spark 中为 LDA 准备数据
Preparing data for LDA in spark
我正在努力实施 Spark LDA 模型(通过 Scala API),但在为我的数据执行必要的格式化步骤时遇到了问题。我的原始数据(存储在文本文件中)采用以下格式,本质上是令牌列表及其对应的文档。一个简化的例子:
doc XXXXX term XXXXX
1 x 'a' x
1 x 'a' x
1 x 'b' x
2 x 'b' x
2 x 'd' x
...
其中 XXXXX 列是我不关心的垃圾数据。我意识到这是一种非典型的语料库数据存储方式,但这就是我所拥有的。正如我希望从示例中清楚的那样,原始数据中每个 token 一行(因此,如果给定术语在文档中出现 5 次,则对应于 5 行文本) .
无论如何,我需要将这些数据格式化为稀疏词频向量,用于 运行 Spark LDA 模型,但我不熟悉 Scala,所以遇到了一些麻烦。
我开始于:
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
val corpus:RDD[Array[String]] = sc.textFile("path/to/data")
.map(_.split('\t')).map(x => Array(x(0),x(2)))
然后我得到生成稀疏向量所需的词汇数据:
val vocab: RDD[String] = corpus.map(_(1)).distinct()
val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap
我不知道在这里使用的映射函数是否合适,这样我就可以为每个文档得到一个稀疏词频向量,然后我可以将其输入 LDA 模型。我想我需要这些方面的东西...
val documents: RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex
.map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))
此时我可以 运行 实际的 LDA:
val lda = new LDA().setK(n_topics)
val ldaModel = lda.run(documents)
基本上,我不知道要对每个组应用什么函数,以便我可以将词频数据(大概是 map
?)输入到稀疏向量中。也就是说,我要如何填写上面代码段中的???
才能达到想要的效果?
一种处理方法:
- 确保
spark-csv
包可用
将数据加载到 DataFrame 和 select 感兴趣的列
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true") // Optional, providing schema is prefered
.option("delimiter", "\t")
.load("foo.csv")
.select($"doc".cast("long").alias("doc"), $"term")
索引term
列:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("term")
.setOutputCol("termIndexed")
val indexed = indexer.fit(df)
.transform(df)
.drop("term")
.withColumn("termIndexed", $"termIndexed".cast("integer"))
.groupBy($"doc", $"termIndexed")
.agg(count(lit(1)).alias("cnt").cast("double"))
转换为PairwiseRDD
import org.apache.spark.sql.Row
val pairs = indexed.map{case Row(doc: Long, term: Int, cnt: Double) =>
(doc, (term, cnt))}
按文档分组:
val docs = pairs.groupByKey
创建特征向量
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.functions.max
val n = indexed.select(max($"termIndexed")).first.getInt(0) + 1
val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq))
现在您拥有了创建LabeledPoints
或应用额外处理
所需的一切
我正在努力实施 Spark LDA 模型(通过 Scala API),但在为我的数据执行必要的格式化步骤时遇到了问题。我的原始数据(存储在文本文件中)采用以下格式,本质上是令牌列表及其对应的文档。一个简化的例子:
doc XXXXX term XXXXX
1 x 'a' x
1 x 'a' x
1 x 'b' x
2 x 'b' x
2 x 'd' x
...
其中 XXXXX 列是我不关心的垃圾数据。我意识到这是一种非典型的语料库数据存储方式,但这就是我所拥有的。正如我希望从示例中清楚的那样,原始数据中每个 token 一行(因此,如果给定术语在文档中出现 5 次,则对应于 5 行文本) .
无论如何,我需要将这些数据格式化为稀疏词频向量,用于 运行 Spark LDA 模型,但我不熟悉 Scala,所以遇到了一些麻烦。
我开始于:
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
val corpus:RDD[Array[String]] = sc.textFile("path/to/data")
.map(_.split('\t')).map(x => Array(x(0),x(2)))
然后我得到生成稀疏向量所需的词汇数据:
val vocab: RDD[String] = corpus.map(_(1)).distinct()
val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap
我不知道在这里使用的映射函数是否合适,这样我就可以为每个文档得到一个稀疏词频向量,然后我可以将其输入 LDA 模型。我想我需要这些方面的东西...
val documents: RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex
.map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))
此时我可以 运行 实际的 LDA:
val lda = new LDA().setK(n_topics)
val ldaModel = lda.run(documents)
基本上,我不知道要对每个组应用什么函数,以便我可以将词频数据(大概是 map
?)输入到稀疏向量中。也就是说,我要如何填写上面代码段中的???
才能达到想要的效果?
一种处理方法:
- 确保
spark-csv
包可用 将数据加载到 DataFrame 和 select 感兴趣的列
val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("inferSchema", "true") // Optional, providing schema is prefered .option("delimiter", "\t") .load("foo.csv") .select($"doc".cast("long").alias("doc"), $"term")
索引
term
列:import org.apache.spark.ml.feature.StringIndexer val indexer = new StringIndexer() .setInputCol("term") .setOutputCol("termIndexed") val indexed = indexer.fit(df) .transform(df) .drop("term") .withColumn("termIndexed", $"termIndexed".cast("integer")) .groupBy($"doc", $"termIndexed") .agg(count(lit(1)).alias("cnt").cast("double"))
转换为
PairwiseRDD
import org.apache.spark.sql.Row val pairs = indexed.map{case Row(doc: Long, term: Int, cnt: Double) => (doc, (term, cnt))}
按文档分组:
val docs = pairs.groupByKey
创建特征向量
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.functions.max val n = indexed.select(max($"termIndexed")).first.getInt(0) + 1 val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq))
现在您拥有了创建
LabeledPoints
或应用额外处理 所需的一切