Spark 2.0 中数据集 [(scala.Long, org.apache.spark.mllib.linalg.Vector)] 上不能 运行 LDA

Can't run LDA on Dataset[(scala.Long, org.apache.spark.mllib.linalg.Vector)] in Spark 2.0

我正在观看有关 LDA 示例的 this 教程视频,但遇到以下问题:

<console>:37: error: overloaded method value run with alternatives:
  (documents: org.apache.spark.api.java.JavaPairRDD[java.lang.Long,org.apache.spark.mllib.linalg.Vector])org.apache.spark.mllib.clustering.LDAModel <and>
  (documents: org.apache.spark.rdd.RDD[(scala.Long, org.apache.spark.mllib.linalg.Vector)])org.apache.spark.mllib.clustering.LDAModel
  cannot be applied to (org.apache.spark.sql.Dataset[(scala.Long, org.apache.spark.mllib.linalg.Vector)])
     val model = run(lda_countVector)
                                   ^

所以我想将这个 DF 转换为 RDD,但它总是为我指定为数据集。有人可以调查一下这个问题吗?

// Convert DF to RDD
import org.apache.spark.mllib.linalg.Vector
val lda_countVector = countVectors.map { case Row(id: Long, countVector: Vector) => (id, countVector) }
// import org.apache.spark.mllib.linalg.Vector
// lda_countVector: org.apache.spark.sql.Dataset[(Long, org.apache.spark.mllib.linalg.Vector)] = [_1: bigint, _2: vector]

Spark API 在 1.x 和 2.x 分支之间更改。特别是 DataFrame.map returns Dataset 不是 RDD,因此结果与基于旧 MLlib RDD 的 API 不兼容。您应该首先将数据转换为 RDD,如下所示:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}

val a = Vectors.dense(Array(1.0, 2.0, 3.0))
val b = Vectors.dense(Array(3.0, 4.0, 5.0))
val df = Seq((1L ,a), (2L, b), (2L, a)).toDF

val ldaDF = df.rdd.map { 
  case Row(id: Long, countVector: Vector) => (id, countVector) 
} 

val model = new LDA().setK(3).run(ldaDF)

或者您可以转换为类型化数据集,然后再转换为 RDD:

val model = new LDA().setK(3).run(df.as[(Long, Vector)].rdd)