Spark - 将 RDD[Vector] 转换为具有可变列的 DataFrame
Spark - Convert RDD[Vector] to DataFrame with variable columns
使用 scala/spark 1.6 推广从 RDD[Vector] 到 DataFrame 的转换的最佳解决方案是什么。
输入是不同的 RDD[Vector]。
对于不同的 RDD,Vector 中的列数可以从 1 到 n。
我尝试使用 shapeless 库,bat 它们需要声明的列号和类型。
ES:
val df = rddVector.map(_.toArray.toList)
.collect {
case t: List[Double] if t.length == 3 => t.toHList[Double :: Double :: Double :: HNil].get.tupled.productArity
}
.toDF( "column_1", "column_2", "column_3" )
谢谢!
这对我有用。
// Create a vector rdd
val vectorRDD = sc.parallelize(Seq(Seq(123L, 345L), Seq(567L, 789L), Seq(567L, 789L, 233334L))).
map(s => Vectors.dense(s.toSeq.map(_.toString.toDouble).toArray))
// Calculate the maximum length of the vector to create a schema
val vectorLength = vectorRDD.map(x => x.toArray.length).max()
// create the dynamic schema
var schema = new StructType()
var i = 0
while (i < vectorLength) {
schema = schema.add(StructField(s"val${i}", DoubleType, true))
i = i + 1
}
// create a rowRDD variable and make each row have the same arity
val rowRDD = vectorRDD.map { x =>
var row = new Array[Double](vectorLength)
val newRow = x.toArray
System.arraycopy(newRow, 0, row, 0, newRow.length);
println(row.length)
Row.fromSeq(row)
}
// create your dataframe
val dataFrame = sqlContext.createDataFrame(rowRDD, schema)
输出:
root
|-- val0: double (nullable = true)
|-- val1: double (nullable = true)
|-- val2: double (nullable = true)
+-----+-----+--------+
| val0| val1| val2|
+-----+-----+--------+
|123.0|345.0| 0.0|
|567.0|789.0| 0.0|
|567.0|789.0|233334.0|
+-----+-----+--------+
使用 scala/spark 1.6 推广从 RDD[Vector] 到 DataFrame 的转换的最佳解决方案是什么。 输入是不同的 RDD[Vector]。 对于不同的 RDD,Vector 中的列数可以从 1 到 n。
我尝试使用 shapeless 库,bat 它们需要声明的列号和类型。 ES:
val df = rddVector.map(_.toArray.toList)
.collect {
case t: List[Double] if t.length == 3 => t.toHList[Double :: Double :: Double :: HNil].get.tupled.productArity
}
.toDF( "column_1", "column_2", "column_3" )
谢谢!
这对我有用。
// Create a vector rdd
val vectorRDD = sc.parallelize(Seq(Seq(123L, 345L), Seq(567L, 789L), Seq(567L, 789L, 233334L))).
map(s => Vectors.dense(s.toSeq.map(_.toString.toDouble).toArray))
// Calculate the maximum length of the vector to create a schema
val vectorLength = vectorRDD.map(x => x.toArray.length).max()
// create the dynamic schema
var schema = new StructType()
var i = 0
while (i < vectorLength) {
schema = schema.add(StructField(s"val${i}", DoubleType, true))
i = i + 1
}
// create a rowRDD variable and make each row have the same arity
val rowRDD = vectorRDD.map { x =>
var row = new Array[Double](vectorLength)
val newRow = x.toArray
System.arraycopy(newRow, 0, row, 0, newRow.length);
println(row.length)
Row.fromSeq(row)
}
// create your dataframe
val dataFrame = sqlContext.createDataFrame(rowRDD, schema)
输出:
root
|-- val0: double (nullable = true)
|-- val1: double (nullable = true)
|-- val2: double (nullable = true)
+-----+-----+--------+
| val0| val1| val2|
+-----+-----+--------+
|123.0|345.0| 0.0|
|567.0|789.0| 0.0|
|567.0|789.0|233334.0|
+-----+-----+--------+