如何使用 Apache Spark 将混合的 Parquet 模式加载到 DataFrame 中?
How to load mixed Parquet schema into DataFrame using Apache Spark?
我有一个 Spark 作业,不断将 Parquet 文件上传到 S3(带分区)。
这些文件都具有相同的镶木地板架构。
最近更改了其中一个字段类型(从字符串到长),因此某些分区的镶木地板模式是混合的。
具有两种类型的混合数据的地方现在无法读取某些内容。
虽然看起来我可以执行:sqlContext.read.load(path)
尝试在 DataFrame 上应用任何提取操作时(例如 collect
),操作失败 ParquetDecodingException
我打算迁移数据并重新格式化但无法将混合内容读入 DataFrame。
如何使用 Apache Spark 将混合分区加载到 DataFrames 或任何其他 Spark 结构中?
以下是 ParquetDecodingException 跟踪:
scala> df.collect
[Stage 1:==============> (1 + 3) / 4]
WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 172.1.1.1, executor 0): org.apache.parquet.io.ParquetDecodingException:
Can not read value at 1 in block 0 in file
s3a://data/parquet/partition_by_day=20180620/partition_by_hour=10/part-00000-6e4f07e4-3d89-4fad-acdf-37054107dc39.snappy.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:166)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
据我所知,您不能混合使用具有相同字段但类型不同的 2 个模式。因此我能想到的唯一解决办法是:
List files of partition
将每个文件重新写入新位置并transform the data to the right schame
- 如果原始数据被分区,则需要再次通过才能恢复分区。
这是因为逐个文件重写数据会覆盖分区。
- 检查您是否可以将所有新分区读取为正确的架构。
- 删除 "bad" 分区并改为复制 tmp 分区
还有一个思路:不改变现有字段的类型(field_string),而是添加一个新的long类型的字段(field_long)并更新读取数据的代码像这样(伪代码)并启用模式合并。我相信它默认启用,但这是一个明确说明它的好例子:
sqlContext.read.option("mergeSchema", "true").parquet(<parquet_file>)
...
if isNull(field_long)
field_value_long = field_string.value.to_long
else
field_value_long = field_long.value
我有一个 Spark 作业,不断将 Parquet 文件上传到 S3(带分区)。
这些文件都具有相同的镶木地板架构。
最近更改了其中一个字段类型(从字符串到长),因此某些分区的镶木地板模式是混合的。
具有两种类型的混合数据的地方现在无法读取某些内容。
虽然看起来我可以执行:sqlContext.read.load(path)
尝试在 DataFrame 上应用任何提取操作时(例如 collect
),操作失败 ParquetDecodingException
我打算迁移数据并重新格式化但无法将混合内容读入 DataFrame。
如何使用 Apache Spark 将混合分区加载到 DataFrames 或任何其他 Spark 结构中?
以下是 ParquetDecodingException 跟踪:
scala> df.collect
[Stage 1:==============> (1 + 3) / 4]
WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 172.1.1.1, executor 0): org.apache.parquet.io.ParquetDecodingException:
Can not read value at 1 in block 0 in file
s3a://data/parquet/partition_by_day=20180620/partition_by_hour=10/part-00000-6e4f07e4-3d89-4fad-acdf-37054107dc39.snappy.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:166)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
据我所知,您不能混合使用具有相同字段但类型不同的 2 个模式。因此我能想到的唯一解决办法是:
List files of partition
将每个文件重新写入新位置并transform the data to the right schame
- 如果原始数据被分区,则需要再次通过才能恢复分区。
这是因为逐个文件重写数据会覆盖分区。 - 检查您是否可以将所有新分区读取为正确的架构。
- 删除 "bad" 分区并改为复制 tmp 分区
还有一个思路:不改变现有字段的类型(field_string),而是添加一个新的long类型的字段(field_long)并更新读取数据的代码像这样(伪代码)并启用模式合并。我相信它默认启用,但这是一个明确说明它的好例子:
sqlContext.read.option("mergeSchema", "true").parquet(<parquet_file>)
...
if isNull(field_long)
field_value_long = field_string.value.to_long
else
field_value_long = field_long.value