Spark 无法读取由 AvroParquetWriter 编写的 Parquet 文件中的 DECIMAL 列

Spark unable to read DECIMAL columns in Parquet files written by AvroParquetWriter

我有一些使用 AvroParquetWriter(来自 Kafka Connect S3 连接器)编写的 Parquet 文件。

文件 aseg_lat 中的一列具有架构 DECIMAL(9, 7)

我可以使用 PyArrow 和 PrestoSQL 完美地阅读该专栏。

尝试在 AWS EMR 上通过 Spark 3.0.0 运行 读取它,我收到以下错误:

scala> var df2 = df.select("aseg_lat")
df2: org.apache.spark.sql.DataFrame = [aseg_lat: decimal(9,7)]

scala> df2.show()
20/08/25 12:03:35 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
20/08/25 12:04:35 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 448, ip-172-30-2-50.ec2.internal, executor 8): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file <redacted>. Column: [aseg_lat], Expected: decimal(9,7), Found: BINARY
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:213)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:122)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon.hasNext(DataSourceScanExec.scala:559)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:298)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:603)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:268)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:285)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:183)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:122)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:207)
    ... 20 more

20/08/25 12:04:38 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 451, ip-172-30-2-50.ec2.internal, executor 5): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file <redacted>. Column: [aseg_lat], Expected: decimal(9,7), Found: BINARY
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:213)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:122)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon.hasNext(DataSourceScanExec.scala:559)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:298)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:603)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:268)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:285)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:183)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:122)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:207)
    ... 20 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2175)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:2124)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:2123)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:990)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:990)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:990)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2355)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2304)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2293)
  at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:792)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3664)
  at org.apache.spark.sql.Dataset.$anonfun$head(Dataset.scala:2737)
  at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3655)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:106)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:207)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:88)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3653)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2737)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2944)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:864)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:823)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:832)
  ... 47 elided
Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file <redacted>. Column: [aseg_lat], Expected: decimal(9,7), Found: BINARY
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:213)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:122)
  at org.apache.spark.sql.execution.FileSourceScanExec$$anon.hasNext(DataSourceScanExec.scala:559)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:345)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:127)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:298)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:603)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:268)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:285)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:183)
  at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:122)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:207)
  ... 20 more

我还尝试通过将 spark.sql.hive.convertMetastoreParquet 设置为 false 来使用 Hive SerDe。这使我可以阅读 DECIMAL 列,但对于其他列(如时间戳)开始失败。

20/08/25 12:28:34 WARN DAGScheduler: Broadcasting large task binary with size 8.7 MiB
20/08/25 12:28:37 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 7, ip-172-30-2-50.ec2.internal, executor 6): java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
    at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject(TableReader.scala:468)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$adapted(TableReader.scala:467)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject(TableReader.scala:493)
    at scala.collection.Iterator$$anon.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon.next(Iterator.scala:459)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:346)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

20/08/25 12:28:39 ERROR TaskSetManager: Task 0 in stage 4.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 10, ip-172-30-2-50.ec2.internal, executor 6): java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
    at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject(TableReader.scala:468)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$adapted(TableReader.scala:467)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject(TableReader.scala:493)
    at scala.collection.Iterator$$anon.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon.next(Iterator.scala:459)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:346)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2175)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:2124)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:2123)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:990)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:990)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:990)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2355)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2304)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2293)
  at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:792)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3664)
  at org.apache.spark.sql.Dataset.$anonfun$head(Dataset.scala:2737)
  at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3655)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:106)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:207)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:88)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3653)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2737)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2944)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:864)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:823)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:832)
  ... 47 elided
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
  at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
  at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject(TableReader.scala:468)
  at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$adapted(TableReader.scala:467)
  at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject(TableReader.scala:493)
  at scala.collection.Iterator$$anon.next(Iterator.scala:459)
  at scala.collection.Iterator$$anon.next(Iterator.scala:459)
  at scala.collection.Iterator$$anon.next(Iterator.scala:459)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd(SparkPlan.scala:346)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal(RDD.scala:872)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$adapted(RDD.scala:872)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:127)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

另一个观察结果是,将 DECIMAL(9, 7) 更改为 DECIMAL(x, 7)(其中 x > 19)允许 Spark 读取该列,但这对我来说不是一个可行的解决方案,因为我有多个 TB 的历史记录用 DECIMAL(9, 7) 写入的数据,我需要重新处理。

如何从 Spark 中读取 AvroParquetWriter 编写的 DECIMAL

禁用 Spark 的矢量化镶木地板 reader 允许 Spark 毫无问题地读取此类列。这在 Spark 3.0.0 和 Spark 2.4.4 上都得到了验证。

即在 SparkSession 或 spark-defaults.

中将 spark.sql.parquet.enableVectorizedReader 设置为 false

感谢@mazaneicha 建议尝试此选项。