如何将 org.apache.spark.rdd.RDD[Array[Double]] 转换为 Spark MLlib 所需的 Array[Double]

How to convert org.apache.spark.rdd.RDD[Array[Double]] to Array[Double] which is required by Spark MLlib

我正在尝试实施 KMeans using Apache Spark

val data = sc.textFile(irisDatasetString)
val parsedData = data.map(_.split(',').map(_.toDouble)).cache()

val clusters = KMeans.train(parsedData,3,numIterations = 20)

出现以下错误:

error: overloaded method value train with alternatives:
  (data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector],k: Int,maxIterations: Int,runs: Int)org.apache.spark.mllib.clustering.KMeansModel <and>
  (data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector],k: Int,maxIterations: Int)org.apache.spark.mllib.clustering.KMeansModel <and>
  (data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector],k: Int,maxIterations: Int,runs: Int,initializationMode: String)org.apache.spark.mllib.clustering.KMeansModel
 cannot be applied to (org.apache.spark.rdd.RDD[Array[Double]], Int, numIterations: Int)
       val clusters = KMeans.train(parsedData,3,numIterations = 20)

所以我尝试将 Array[Double] 转换为 Vector,如图 here

scala> val vectorData: Vector = Vectors.dense(parsedData)

我收到以下错误:

error: type Vector takes type parameters
   val vectorData: Vector = Vectors.dense(parsedData)
                   ^
error: overloaded method value dense with alternatives:
  (values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
  (firstValue: Double,otherValues: Double*)org.apache.spark.mllib.linalg.Vector
 cannot be applied to (org.apache.spark.rdd.RDD[Array[Double]])
       val vectorData: Vector = Vectors.dense(parsedData)

所以我推断 org.apache.spark.rdd.RDD[Array[Double]] 与 Array[Double]

不同

如何以 org.apache.spark.rdd.RDD[Array[Double]] 的形式处理我的数据?或者如何转换 org.apache.spark.rdd.RDD[Array[Double]] to Array[Double] ?

KMeans.train 期望 RDD[Vector] 而不是 RDD[Array[Double]]。在我看来,你需要做的就是改变

val parsedData = data.map(_.split(',').map(_.toDouble)).cache()

val parsedData = data.map(x => Vectors.dense(x.split(',').map(_.toDouble))).cache()