如何将 RDD[Row] 转换为 RDD[Vector]
How to convert RDD[Row] to RDD[Vector]
我正在尝试使用 scala 实现 k-means 方法。
我创建了一个类似这样的 RDD
val df = sc.parallelize(data).groupByKey().collect().map((chunk)=> {
sc.parallelize(chunk._2.toSeq).toDF()
})
val examples = df.map(dataframe =>{
dataframe.selectExpr(
"avg(time) as avg_time",
"variance(size) as var_size",
"variance(time) as var_time",
"count(size) as examples"
).rdd
})
val rdd_final=examples.reduce(_ union _)
val kmeans= new KMeans()
val model = kmeans.run(rdd_final)
使用这段代码我得到一个错误
type mismatch;
[error] found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error] required:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
所以我尝试投射:
val rdd_final_Vector = rdd_final.map{x:Row => x.getAs[org.apache.spark.mllib.linalg.Vector](0)}
val model = kmeans.run(rdd_final_Vector)
但是我得到一个错误:
java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.mllib.linalg.Vector
所以我正在寻找一种方法来执行该转换,但我找不到任何方法。
有什么想法吗?
此致
这里至少有几个问题:
- 不 你真的不能将 Row 转换为 Vector:Row 是
Spark SQL
理解的可能不同类型的集合。 Vector
不是原生 spark sql 类型
- 您的 SQL 语句的内容与您试图通过
KMeans
实现的内容似乎不匹配:SQL 正在执行聚合。但是 KMeans
需要一系列单独的数据点,形式为向量(封装 Array[Double]
)。那么 - 为什么要向 KMeans
操作提供 sum
和 average
?
此处仅解决#1:您需要按照以下方式做一些事情:
val doubVals = <rows rdd>.map{ row => row.getDouble("colname") }
val vector = Vectors.toDense{ doubVals.collect}
那么您就有了一个正确封装的 Array[Double]
(在 Vector 中),可以提供给 Kmeans
。
我正在尝试使用 scala 实现 k-means 方法。 我创建了一个类似这样的 RDD
val df = sc.parallelize(data).groupByKey().collect().map((chunk)=> {
sc.parallelize(chunk._2.toSeq).toDF()
})
val examples = df.map(dataframe =>{
dataframe.selectExpr(
"avg(time) as avg_time",
"variance(size) as var_size",
"variance(time) as var_time",
"count(size) as examples"
).rdd
})
val rdd_final=examples.reduce(_ union _)
val kmeans= new KMeans()
val model = kmeans.run(rdd_final)
使用这段代码我得到一个错误
type mismatch;
[error] found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error] required:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
所以我尝试投射:
val rdd_final_Vector = rdd_final.map{x:Row => x.getAs[org.apache.spark.mllib.linalg.Vector](0)}
val model = kmeans.run(rdd_final_Vector)
但是我得到一个错误:
java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.mllib.linalg.Vector
所以我正在寻找一种方法来执行该转换,但我找不到任何方法。
有什么想法吗?
此致
这里至少有几个问题:
- 不 你真的不能将 Row 转换为 Vector:Row 是
Spark SQL
理解的可能不同类型的集合。Vector
不是原生 spark sql 类型 - 您的 SQL 语句的内容与您试图通过
KMeans
实现的内容似乎不匹配:SQL 正在执行聚合。但是KMeans
需要一系列单独的数据点,形式为向量(封装Array[Double]
)。那么 - 为什么要向KMeans
操作提供sum
和average
?
此处仅解决#1:您需要按照以下方式做一些事情:
val doubVals = <rows rdd>.map{ row => row.getDouble("colname") }
val vector = Vectors.toDense{ doubVals.collect}
那么您就有了一个正确封装的 Array[Double]
(在 Vector 中),可以提供给 Kmeans
。