如何将 VertexRDD 转换为 DataFrame

how to convert VertexRDD to DataFrame

我有一个 VertexRDD[DenseVector[Double]],我想将它转换为数据帧。我不明白如何将 DenseVector 中的值映射到数据框中的新列。

我正在尝试将架构指定为:

val schemaString = "id prop1 prop2 prop3 prop4 prop5 prop6 prop7"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

我认为一个选项是将我的 VertexRDD - 其中 breeze.linalg.DenseVector 包含所有值 - 转换为 RDD[Row],以便我最终可以创建一个数据框,如:

val myRDD = myvertexRDD.map(f => Row(f._1, f._2.toScalaVector().toSeq))
val mydataframe = SQLContext.createDataFrame(myRDD, schema)

但是我得到了

// scala.MatchError: 20502 (of class java.lang.Long)

欢迎任何提示

一种处理方法:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType, DoubleType}

val rows = myvertexRDD.map{
  case(id, v) => Row.fromSeq(id +: v.toArray)
}

val schema = StructType(
  StructField("id", LongType, false) +: 
  (1 to 7).map(i => StructField(s"prop$i", DoubleType, false)))

val df = sqlContext.createDataFrame(rows, schema)

备注:

  • 声明的类型必须与实际类型匹配。您不能声明字符串并传递 long 或 double
  • 行的结构必须与声明的结构匹配。在您的情况下,您尝试使用 LongVector[Double] 创建行,但声明 8 列