Spark:在读取模式时强制执行 Parquet DataFrame 操作失败

Spark: Parquet DataFrame operations fail when forcing schema on read

(Spark 2.0.2)

当您拥有具有不同模式的镶木地板文件并在读取过程中强制使用模式时,这里的问题就会出现。即使您可以打印模式并且 运行 show() ok,您也不能对缺失的列应用任何过滤逻辑。

这是两个示例模式:

// assuming you are running this code in a spark REPL
import spark.implicits._

case class Foo(i: Int)
case class Bar(i: Int, j: Int) 

所以Bar包含了Foo的所有字段,再加上一个(j)。在现实生活中,当您从架构 Foo 开始,后来决定需要更多字段并以架构 Bar 结束时,就会出现这种情况。

让我们模拟两个不同的镶木地板文件。

// assuming you are on a Mac or Linux OS
spark.createDataFrame(Foo(1)::Nil).write.parquet("/tmp/foo")
spark.createDataFrame(Bar(1,2)::Nil).write.parquet("/tmp/bar")

我们在这里想要的是始终使用更通用的模式 Bar 读取数据。也就是说,写在模式 Foo 上的行应该 j 为空。

案例 1:我们读取了两种模式的混合

spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").show()
+---+----+
|  i|   j|
+---+----+
|  1|   2|
|  1|null|
+---+----+


spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").filter($"j".isNotNull).show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

情况2:我们只有Bar数据

spark.read.parquet("/tmp/bar").show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

情况 3:我们只有 Foo 数据

scala> spark.read.parquet("/tmp/foo").show()
+---+
|  i|
+---+
|  1|
+---+

有问题的情况是 3,其中我们生成的模式是 Foo 类型而不是 Bar 类型。由于我们迁移到架构 Bar,我们希望始终从我们的数据(旧的和新的)中获取架构 Bar

建议的解决方案是以编程方式将架构定义为始终 Bar。让我们看看如何做到这一点:

val barSchema = org.apache.spark.sql.Encoders.product[Bar].schema
//barSchema: org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false)) 

运行 show() 效果很好:

scala> spark.read.schema(barSchema).parquet("/tmp/foo").show()
+---+----+
|  i|   j|
+---+----+
|  1|null|
+---+----+

但是,如果您尝试在缺失的列 j 上进行过滤,事情就会失败。

scala> spark.read.schema(barSchema).parquet("/tmp/foo").filter($"j".isNotNull).show()
17/09/07 18:13:50 ERROR Executor: Exception in task 0.0 in stage 230.0 (TID 481)
java.lang.IllegalArgumentException: Column [j] was not found in schema!
    at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
    at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
    at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
    at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader.apply(ParquetFileFormat.scala:381)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader.apply(ParquetFileFormat.scala:355)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:168)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:109)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

在 Spark 1.6 上运行良好,模式检索已更改,使用了 HiveContext:

val barSchema = ScalaReflection.schemaFor[Bar].dataType.asInstanceOf[StructType]
println(s"barSchema: $barSchema")
hiveContext.read.schema(barSchema).parquet("tmp/foo").filter($"j".isNotNull).show()

结果是:

barSchema: StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false))
+---+----+
|  i|   j|
+---+----+
|  1|null|
+---+----+

对我有用的是将 createDataFrame API 与 RDD[Row] 和新模式(至少新列可以为空)一起使用。

// Make the columns nullable (probably you don't need to make them all nullable)
val barSchemaNullable = org.apache.spark.sql.types.StructType(
   barSchema.map(_.copy(nullable = true)).toArray)

// We create the df (but this is not what you want to use, since it still has the same issue)
val df = spark.read.schema(barSchemaNullable).parquet("/tmp/foo")

// Here is the final API that give a working DataFrame
val fixedDf = spark.createDataFrame(df.rdd, barSchemaNullable)

fixedDf.filter($"j".isNotNull).show()

+---+---+
|  i|  j|
+---+---+
+---+---+

问题是由于 parquet 过滤器下推导致的,在低于 1.9.0 的 parquet-mr 版本中没有正确处理

您可以查看 https://issues.apache.org/jira/browse/PARQUET-389 了解更多详情。

您可以升级 parquet-mr 版本或添加新列并将过滤器基于新列。

例如

dfNew = df.withColumn("new_j", when($"j".isNotNull, $"j").otherwise(lit(null))) dfNew.filter($"new_j".isNotNull)