从包含 Option[T] 的行创建 DataFrame 的问题

Problems to create DataFrame from Rows containing Option[T]

我正在将一些代码从 Spark 1.6 迁移到 Spark 2.1 并遇到以下问题:

这在 Spark 1.6 中完美运行

import org.apache.spark.sql.types.{LongType, StructField, StructType}  

val schema = StructType(Seq(StructField("i", LongType,nullable=true)))    
val rows = sparkContext.parallelize(Seq(Row(Some(1L))))
sqlContext.createDataFrame(rows,schema).show

Spark 2.1.1 中的相同代码:

import org.apache.spark.sql.types.{FloatType, LongType, StructField, StructType}

val schema = StructType(Seq(StructField("i", LongType,nullable=true)))
val rows = ss.sparkContext.parallelize(Seq(Row(Some(1L))))
ss.createDataFrame(rows,schema).show

给出以下运行时异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 72, i89203.sbb.ch, executor 9): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.Some is not a valid external type for schema of bigint

那么,如果我想使用可为空的 Long 而不是使用 Option[Long],我应该如何将此类代码转换为 Spark 2.x?

错误消息很清楚,说在需要 bigint 时使用 Some

scala.Some is not a valid external type for schema of bigint

所以你需要使用Option结合getOrElse这样我们就可以在Optionreturnsnullpointer时定义null。以下代码应该适合您

val sc = ss.sparkContext
val sqlContext = ss.sqlContext
val schema = StructType(Seq(StructField("i", LongType,nullable=true)))
val rows = sc.parallelize(Seq(Row(Option(1L) getOrElse(null))))
sqlContext.createDataFrame(rows,schema).show

希望这个回答对您有所帮助

实际上有一个 JIRA SPARK-19056 关于这个问题,但实际上不是一个。

所以这种行为是故意的。

Allowing Option in Row is never documented and brings a lot of troubles when we apply the encoder framework to all typed operations. Since Spark 2.0, please use Dataset for typed operation/custom objects. e.g.

val ds = Seq(1 -> None, 2 -> Some("str")).toDS
ds.toDF // schema: <_1: int, _2: string>