将 Rdd[Vector] 转换为 Rdd[Double]

Convert Rdd[Vector] to Rdd[Double]

如何将 csv 转换为 Rdd[Double]?我有错误:不能应用于 (org.apache.spark.rdd.RDD[Unit]) 在这一行:

val kd = new KernelDensity().setSample(rows) 

我的完整代码在这里:

   import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    import org.apache.spark.mllib.stat.KernelDensity
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkContext, SparkConf}

class KdeAnalysis {
  val conf = new SparkConf().setAppName("sample").setMaster("local")
  val sc = new SparkContext(conf)

  val DATAFILE: String = "C:\Users\ajohn\Desktop\spark_R\data\mass_cytometry\mass.csv"
  val rows = sc.textFile(DATAFILE).map {
    line => val values = line.split(',').map(_.toDouble)
      Vectors.dense(values)
  }.cache()



  // Construct the density estimator with the sample data and a standard deviation for the Gaussian
  // kernels
  val rdd : RDD[Double] = sc.parallelize(rows)
  val kd = new KernelDensity().setSample(rdd)
    .setBandwidth(3.0)

  // Find density estimates for the given values
  val densities = kd.estimate(Array(-1.0, 2.0, 5.0))
}

因为 rows 是一个 RDD[org.apache.spark.mllib.linalg.Vector] 下面的行不能工作:

val rdd : RDD[Double] = sc.parallelize(rows)

parallelize 期望 Seq[T]RDD 不是 Seq.

即使这部分按您预期的那样工作,您的输入也完全是错误的。 KernelDensity.setSample 的正确参数是 RDD[Double]JavaRDD[java.lang.Double]。貌似暂时不支持多元数据。

关于磁贴中的问题,您可以flatMap

rows.flatMap(_.toArray)

甚至更好,当您创建 rows

val rows = sc.textFile(DATAFILE).flatMap(_.split(',').map(_.toDouble)).cache()

但我怀疑它是否真的是您所需要的。

已准备好此代码,请评价是否对您有帮助->

val doubleRDD = rows.map(_.toArray).flatMap(x => x)