如何将 RDD[Row] 转换为 RDD[Vector]

How to convert RDD[Row] to RDD[Vector]

我正在尝试使用 scala 实现 k-means 方法。 我创建了一个类似这样的 RDD

val df = sc.parallelize(data).groupByKey().collect().map((chunk)=> {
  sc.parallelize(chunk._2.toSeq).toDF()
})

val examples = df.map(dataframe =>{
  dataframe.selectExpr(
    "avg(time) as avg_time",
    "variance(size) as var_size",
    "variance(time) as var_time",
    "count(size) as examples"
  ).rdd
})

val rdd_final=examples.reduce(_ union _)

val kmeans= new KMeans()
val model = kmeans.run(rdd_final)

使用这段代码我得到一个错误

type mismatch;
[error]  found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error]  required:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]

所以我尝试投射:

val rdd_final_Vector = rdd_final.map{x:Row => x.getAs[org.apache.spark.mllib.linalg.Vector](0)}

val model = kmeans.run(rdd_final_Vector)

但是我得到一个错误:

java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.mllib.linalg.Vector

所以我正在寻找一种方法来执行该转换,但我找不到任何方法。

有什么想法吗?

此致

这里至少有几个问题:

  1. 你真的不能将 Row 转换为 Vector:Row 是 Spark SQL 理解的可能不同类型的集合。 Vector 不是原生 spark sql 类型
  2. 您的 SQL 语句的内容与您试图通过 KMeans 实现的内容似乎不匹配:SQL 正在执行聚合。但是 KMeans 需要一系列单独的数据点,形式为向量(封装 Array[Double])。那么 - 为什么要向 KMeans 操作提供 sumaverage

此处仅解决#1:您需要按照以下方式做一些事情:

val doubVals = <rows rdd>.map{ row =>   row.getDouble("colname") }
val vector = Vectors.toDense{ doubVals.collect}

那么您就有了一个正确封装的 Array[Double](在 Vector 中),可以提供给 Kmeans