Spark 2.0 如何处理列的可空性?

How does Spark 2.0 handle column nullability?

在最近发布的 The Data Engineer's Guide to Apache Spark 中,作者声明(第 74 页):

"...when you define a schema where all columns are declared to not have null values - Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be hard to debug."

在查看笔记和以前的 JIRA 时,上面的说法似乎不再正确。

根据 SPARK-13740 and SPARK-15192,当在创建的 DataFrame 上定义架构时,似乎会强制执行可空性。

我能得到一些说明吗?我不再确定行为是什么。

长话短说我们不知道。的确,Spark 在强制 nullable 属性方面变得更加严格

但是考虑到 Spark 的复杂性(来宾语言的数量、库的大小、用于优化的低级机制的数量、可插入数据源以及相对较大的遗留代码池),确实不能保证相当有限最新版本中包含的安全检查涵盖了所有可能的场景。

不同的DataFrame创建过程对于空类型的处理方式不同。这并不是很简单,因为至少有三个不同的区域对空值的处理完全不同。

  1. 首先,SPARK-15192是关于RowEncoders的。在 RowEncoders 的情况下,不允许有空值,并且错误消息已得到改进。例如,通过 SparkSession.createDataFrame() 的两打左右重载,createDataFrame() 的很多实现基本上都是将 RDD 转换为 DataFrame。 在我下面的示例中,没有接受任何空值。因此,尝试使用类似于下面的 createDateFrame() 方法将 RDD 转换为 DataFrame 的操作,您将获得相同的结果...

    val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true)))
    val intNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)))
    spark.createDataFrame(intNullsRDD, schema).show()
    

在 Spark 2.1.1 中,错误消息非常好。

17/11/23 21:30:37 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 6)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: The 0th field 'colA' of input row cannot be null.
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType) AS colA#73
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType)
   +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
         +- input[0, org.apache.spark.sql.Row, true]

单步执行代码,您可以看到发生这种情况的位置。在下面的 doGenCode() 方法中有验证。紧接着,当使用 val encoder = RowEncoder(schema) 创建 RowEncoder 对象时,该逻辑就开始了。

     @DeveloperApi
     @InterfaceStability.Evolving
     def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
     createDataFrame(rowRDD, schema, needsConversion = true)
    }

    private[sql] def createDataFrame(
      rowRDD: RDD[Row],
      schema: StructType,
      needsConversion: Boolean) = {
    // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
    // schema differs from the existing schema on any field data type.
    val catalystRows = if (needsConversion) {
      val encoder = RowEncoder(schema)
      rowRDD.map(encoder.toRow)
    } else {
      rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
      }
      val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
      Dataset.ofRows(self, logicalPlan)
    }

进一步执行此逻辑后,这里是 objects.scala 中改进的消息,这就是代码处理空值的地方。实际上错误消息被传递到 ctx.addReferenceObj(errMsg) 但你明白了。

 case class GetExternalRowField(
    child: Expression,
    index: Int,
    fieldName: String) extends UnaryExpression with NonSQLExpression {

  override def nullable: Boolean = false
  override def dataType: DataType = ObjectType(classOf[Object])
  override def eval(input: InternalRow): Any =
    throw new UnsupportedOperationException("Only code-generated evaluation is supported")

  private val errMsg = s"The ${index}th field '$fieldName' of input row cannot be null."

  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    // Use unnamed reference that doesn't create a local field here to reduce the number of fields
    // because errMsgField is used only when the field is null.
    val errMsgField = ctx.addReferenceObj(errMsg)
    val row = child.genCode(ctx)
    val code = s"""
      ${row.code}

      if (${row.isNull}) {
        throw new RuntimeException("The input external row cannot be null.");
      }

      if (${row.value}.isNullAt($index)) {
        throw new RuntimeException($errMsgField);
      }

      final Object ${ev.value} = ${row.value}.get($index);
     """
    ev.copy(code = code, isNull = "false")
  }
} 
  1. 从 HDFS 数据源中提取时会发生完全不同的情况。在这种情况下,当存在不可为 null 的列并且出现 null 时,不会出现错误消息。该列仍然接受 null 值。查看我创建的快速测试文件 "testFile.csv" 然后将其放入 hdfs hdfs dfs -put testFile.csv /data/nullTest

       |colA|colB|colC|colD| 
       |    |    |    |    |
       |    |   2|   2|   2|
       |    |   3|    |    |
       |   4|    |    |    |
    

当我使用相同的 nschema 架构从下面的文件中读取时,所有空白值都变为空值,即使该字段不可为空。有多种方法可以不同地处理空白,但这是默认设置。 csv 和 parquet 的结果相同。

val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = true), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = true), StructField("colD", IntegerType, nullable = true)))
val jListNullsADF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,nschema)
jListNullsADF.write.format("parquet").save("/data/parquetnulltest")
spark.read.format("parquet").schema(schema).load("/data/parquetnulltest").show()

+----+----+----+----+
|colA|colB|colC|colD|
+----+----+----+----+
|null|null|null|null|
|null|   2|   2|   2|
|null|null|   3|null|
|null|   4|null|   4|
+----+----+----+----+

允许空值的原因始于 DataFrameReader 创建,其中调用 DataFramerReader.scala 中的 baseRelationToDataFrame()。 SparkSession.scala 中的 baseRelationToDataFrame() 在方法中使用 QueryPlan class,而 QueryPlan 正在重新创建 StructType 始终具有可空字段 的方法 fromAttributes() 与原始模式基本相同,但强制为空。因此,当它返回时 RowEncoder(),它现在是原始模式的可空版本。

在 DataFrameReader.scala 的正下方您可以看到 baseRelationToDataFrame() 调用...

  @scala.annotation.varargs
  def load(paths: String*): DataFrame = {
    sparkSession.baseRelationToDataFrame(
      DataSource.apply(
        sparkSession,
        paths = paths,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap).resolveRelation())
  }

在文件 SparkSession.scala 的正下方,您可以看到正在调用 Dataset.ofRows(self: SparkSession, lr: LogicalRelation) 方法,请密切注意 LogicalRelation 计划构造函数。

  def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
    Dataset.ofRows(self, LogicalRelation(baseRelation))
  }

在 Dataset.scala 中,已分析的 QueryPlan 对象的架构 属性 作为第三个参数传递,以在 new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)).

中创建数据集
  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }
}

在 QueryPlan.scala 中使用了 StructType.fromAttributes() 方法

 lazy val schema: StructType = StructType.fromAttributes(output)

最后在 StructType.scala 中,可为空的 属性 始终可为空。

  private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
    StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))

关于查询计划因可空性而不同,我认为 LogicalPlan 完全有可能因列是否可为空而不同。许多信息被传递到该对象中,并且有很多后续逻辑来创建计划。但它 在实际写入数据帧时 未保持可为空 ,正如我们刚才看到的那样。

  1. 第三种情况是依赖于DataType。当您使用方法 createDataFrame(rows: java.util.List[Row], schema: StructType) 创建 DataFrame 时,它​​实际上会 在将 null 传递到不可为 null 的 IntegerType 字段 的地方创建零。你可以看到下面的例子...

      val schema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true))) 
      val jListNullsDF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,schema)
      jListNullsDF.show() 
    
      +----+----+----+----+
      |colA|colB|colC|colD|
      +----+----+----+----+
      |   0|null|   0|null|
      |   2|null|   0|null|
      |   0|   3|   0|null|
      |   0|null|   0|   4|
      +----+----+----+----+
    

看起来 org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt() 中有用零替换空值的逻辑。但是,对于不可为 null 的 StringType 字段,空值的处理不那么优雅。

   val strschema = StructType(Seq(StructField("colA", StringType, nullable = false), StructField("colB", StringType, nullable = true), StructField("colC", StringType, nullable = false), StructField("colD", StringType, nullable = true)))
   val strNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2colA",null,null,null),org.apache.spark.sql.Row(null,"r3colC",null,null),org.apache.spark.sql.Row(null,null,null,"r4colD")))
spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2cA",null,null,null),org.apache.spark.sql.Row(null,"row3cB",null,null),org.apache.spark.sql.Row(null,null,null,"row4ColD")).asJava,strschema).show()

但下面是一条不太有用的错误消息,它没有指定字段的顺序位置...

java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)