如何处理 Apache Spark 中不断变化的镶木地板架构

How to handle changing parquet schema in Apache Spark

我 运行 遇到一个问题,我在 S3 中将 Parquet 数据作为每日块(以 s3://bucketName/prefix/YYYY/MM/DD/ 的形式),但我无法从不同日期读取 AWS EMR Spark 中的数据,因为某些列类型不匹配,我得到许多异常之一,例如:

java.lang.ClassCastException: optional binary element (UTF8) is not a group

当某些文件中有一个具有值的数组类型时出现,但同一列在其他文件中可能具有 null 值,然后将其推断为字符串类型。

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal):
org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)

我在 S3 中有 JSON 格式的原始数据,我最初的计划是创建一个自动作业,它启动一个 EMR 集群,读取前一个日期的 JSON 数据,然后简单地将其作为镶木地板写回 S3。

JSON数据也分为日期,即键有日期前缀。阅读 JSON 工作正常。无论当前正在读取多少数据,都会从数据中推断出架构。

但是当写入parquet文件时,问题就来了。据我了解,当我用元数据文件编写镶木地板时,这些文件包含所有 parts/partitions 镶木地板文件的模式。在我看来,这也可以有不同的模式。当我禁用写入元数据时,据说 Spark 从给定 Parquet 路径中的第一个文件推断出整个模式,并假定它在其他文件中保持不变。

当一些应该是 double 类型的列在给定日期只有整数值时,从 JSON 中读取它们(这些数字是整数,没有浮点数)使得Spark 认为它是一个类型为 long 的列。即使我可以在写入 Parquet 文件之前将这些列转换为 double,这仍然不好,因为架构可能会更改,可以添加新列,并且无法跟踪它。

我看到有些人有同样的问题,但我还没有找到足够好的解决方案。

这方面的最佳做法或解决方案是什么?

这些是我用来将 parquet 写入 S3 的选项;关闭架构合并可以提高写回性能 - 它也可以解决您的问题

val PARQUET_OPTIONS = Map(
 "spark.sql.parquet.mergeSchema" -> "false",
 "spark.sql.parquet.filterPushdown" -> "true")

因为我每天从 JSON 读取数据块并写入每日 S3 文件夹中的 Parquet,在读取 JSON 或将容易出错的列转换为正确类型之前没有指定我自己的模式写入 Parquet 时,Spark 可能会根据数据实例中的值为不同日期的数据推断出不同的模式,并写入具有冲突模式的 Parquet 文件。

它可能不是完美的解决方案,但我发现使用不断发展的模式解决我的问题的唯一方法如下:

在我每天(更具体地说是每晚)批量处理前一天数据的 cron 作业之前,我创建了一个大部分为空值的虚拟对象。

我确保 ID 是可识别的,例如,由于真实数据具有唯一 ID,我将 "dummy" 字符串作为 ID 添加到虚拟数据对象。

然后我会为容易出错的类型的属性给出预期值,例如我会给出 floats/doubles 非零值,因此当编组为 JSON 时,它们肯定会有小数点分隔符,例如“0.2”而不是“0”(当编组为 JSON,具有 0 值的 doubles/floats 显示为“0”而不是“0.0”)。

字符串和布尔值以及整数工作正常,但除了 doubles/floats 我还需要将数组实例化为空数组和其他 classes/structs 的对象以及相应的空对象,这样它们就不会"null"-s,因为 Spark 将 null-s 作为字符串读取。


然后,如果我填写了所有必需的字段,我会将对象编组到 JSON 并将文件写入 S3。

然后我会在我的 Scala 批处理脚本中使用这些文件来读入它们,将模式保存到一个变量中,并在我读入真实 JSON 数据时将此模式作为参数提供以避免 Spark进行自己的模式推断。

这样我就知道所有的字段总是相同的类型,只有在添加新字段时才需要模式合并来加入模式。

当然,当添加易出错类型的新字段时,它会增加手动更新虚拟对象创建的缺点,但这目前是一个小缺点,因为它是我发现唯一可行的解​​决方案。

只需制作一个 rdd[String],其中每个字符串都是一个 json,当将 rdd 制作为数据帧时,使用 primitiveAsString 选项将所有数据类型制作为 String

 val binary_zip_RDD = sc.binaryFiles(batchHolder.get(i), minPartitions = 50000)
 // rdd[String]  each string is a json ,lowercased json
    val TransformedRDD = binary_zip_RDD.flatMap(kv => ZipDecompressor.Zip_open_hybrid(kv._1, kv._2, proccessingtimestamp))
 // now the schema of dataframe would be consolidate schema of all json strings
    val jsonDataframe_stream = sparkSession.read.option("primitivesAsString", true).json(TransformedRDD)

    println(jsonDataframe_stream.printSchema())


    jsonDataframe_stream.write.mode(SaveMode.Append).partitionBy(GetConstantValue.DEVICEDATE).parquet(ApplicationProperties.OUTPUT_DIRECTORY)