如何将RDD解析为Dataframe

How to parse RDD to Dataframe

我正在尝试将 RDD[Seq[String]] 解析为 Dataframe。 虽然它是一个字符串序列,但它们可以有更具体的类型,如 Int、Boolean、Double、String 等等。 例如,一行可以是:

"hello", "1", "bye", "1.1"
"hello1", "11", "bye1", "2.1"
...

另一个执行可能有不同的列数。

第一列将始终是一个字符串,第二列是一个整数,依此类推,并且它将始终以这种方式出现。另一方面,一个执行可能有五个元素的序列,而其他执行可能有 2000 个元素,所以这取决于执行。在每次执行中定义列类型的名称。

要做到这一点,我可以有这样的东西:

//I could have a parameter to generate the StructType dinamically.
def getSchema(): StructType = {
  var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]()
  schemaArray += StructField("col1" , IntegerType, true)
  schemaArray += StructField("col2" , StringType, true)
  schemaArray += StructField("col3" , DoubleType, true)
  StructType(schemaArray)
}

//Array of Any?? it doesn't seem the best option!!
val l1: Seq[Any] = Seq(1,"2", 1.1 )
val rdd1 = sc.parallelize(l1).map(Row.fromSeq(_))

val schema = getSchema()
val df = sqlContext.createDataFrame(rdd1, schema)
df.show()
df.schema

我一点也不喜欢拥有 Any 的 Seq,但这确实是我所拥有的。还有机会??

另一方面,我在想我有类似于 CSV 的东西,我可以创建一个。使用 spark 有一个库可以读取 CSV 和 return 一个推断类型的数据框。如果我已经有一个 RDD[String] 是否可以调用它?

由于每次执行都会更改列数,因此我建议使用 CSV 选项,并将分隔符设置为 space 或其他设置。这样 spark 就会为您找出列类型。

更新:

既然你提到你从 HBase 读取数据,一种方法是将 HBase 行转换为 JSON 或 CSV,然后将 RDD 转换为数据帧:

val jsons = hbaseContext.hbaseRDD(tableName, scan).map{case (_, r) =>
  val currentJson = new JSONObject
  val cScanner = r.cellScanner
  while (cScanner.advance) {
    currentJson.put(Bytes.toString(cScanner.current.getQualifierArray, cScanner.current.getQualifierOffset, cScanner.current.getQualifierLength),
      Bytes.toString(cScanner.current.getValueArray, cScanner.current.getValueOffset, cScanner.current.getValueLength))
  }
  currentJson.toString
}
val df = spark.read.json(spark.createDataset(jsons))

可以对 CSV 执行类似的操作。