使用 Spark 展平嵌套的 ORC 文件 - 性能问题

Flattening a nested ORC file with Spark - Performance issue

我们在读取嵌套的 ORC 文件时遇到严重的性能问题。

这是我们的 ORC 架构:

|-- uploader: string (nullable = true)
|-- email: string (nullable = true)
|-- data: array (nullable = true)
|    |-- element: struct (containsNull = true) 
|    |    |-- startTime: string (nullable = true)
|    |    |-- endTime: string (nullable = true)
|    |    |-- val1: string (nullable = true)
|    |    |-- val2: string (nullable = true)
|    |    |-- val3: integer (nullable = true)
|    |    |-- val4: integer (nullable = true)
|    |    |-- val5: integer (nullable = true)
|    |    |-- val6: integer (nullable = true)

“数据”数组可能包含 75K 个对象。

在我们的 spark 应用程序中,我们将这个 ORC 展平,如下所示:

val dataFrame = spark.read.orc(files: _*)
val withData = dataFrame.withColumn("data", explode(dataFrame.col("data")))
val withUploader = withData.select($"uploader", $"data")
val allData = withUploader
  .withColumn("val_1", $"data.val1")
  .withColumn("val_2", $"data.val2")
  .withColumn("val_3", $"data.val3")
  .withColumn("val_4", $"data.val4")
  .withColumn("val_5", $"data.val5")
  .withColumn("val_6", $"data.val6")
  .withColumn("utc_start_time", timestampUdf($"data.startTime"))
  .withColumn("utc_end_time", timestampUdf($"data.endTime"))

allData.drop("data")

扁平化过程似乎是一个非常繁重的操作: 读取一个包含 20 条记录的 2MB ORC 文件,每条记录都包含一个包含 75K 对象的数据数组,需要数小时的处理时间。读取文件并在不展平的情况下收集文件,需要 22 秒。

有没有办法让spark处理数据的速度更快?

我会尽量避免大 explodes 完全。数组中有 75K 个元素:

  • 您为每个 Row 创建了 75K Row 个对象。这是一项巨大的分配工作。
  • 您重复了 uploadedemail 75K 次。短期内它将引用相同的数据,但一旦数据以内部格式进行序列化和反序列化,它们就会指向不同的对象,从而有效地增加内存需求。

根据您要应用的转换,使用 UDF 处理整个数组的效率可能会高得多。

如果这对某人有帮助,我发现使用 flatmap 展平数据比使用 explode 快得多:

dataFrame.as[InputFormat].flatMap(r => r.data.map(v => OutputFormat(v, r.tenant)))

性能提升非常显着。

处理一个包含 20 条记录的文件,每条记录包含一个 250K 行的数组 - 使用分解实现需要 8 小时,使用平面图实现 - 7 分钟 (!)