使用 Spark 展平嵌套的 ORC 文件 - 性能问题
Flattening a nested ORC file with Spark - Performance issue
我们在读取嵌套的 ORC 文件时遇到严重的性能问题。
这是我们的 ORC 架构:
|-- uploader: string (nullable = true)
|-- email: string (nullable = true)
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- startTime: string (nullable = true)
| | |-- endTime: string (nullable = true)
| | |-- val1: string (nullable = true)
| | |-- val2: string (nullable = true)
| | |-- val3: integer (nullable = true)
| | |-- val4: integer (nullable = true)
| | |-- val5: integer (nullable = true)
| | |-- val6: integer (nullable = true)
“数据”数组可能包含 75K 个对象。
在我们的 spark 应用程序中,我们将这个 ORC 展平,如下所示:
val dataFrame = spark.read.orc(files: _*)
val withData = dataFrame.withColumn("data", explode(dataFrame.col("data")))
val withUploader = withData.select($"uploader", $"data")
val allData = withUploader
.withColumn("val_1", $"data.val1")
.withColumn("val_2", $"data.val2")
.withColumn("val_3", $"data.val3")
.withColumn("val_4", $"data.val4")
.withColumn("val_5", $"data.val5")
.withColumn("val_6", $"data.val6")
.withColumn("utc_start_time", timestampUdf($"data.startTime"))
.withColumn("utc_end_time", timestampUdf($"data.endTime"))
allData.drop("data")
扁平化过程似乎是一个非常繁重的操作:
读取一个包含 20 条记录的 2MB ORC 文件,每条记录都包含一个包含 75K 对象的数据数组,需要数小时的处理时间。读取文件并在不展平的情况下收集文件,需要 22 秒。
有没有办法让spark处理数据的速度更快?
我会尽量避免大 explodes
完全。数组中有 75K 个元素:
- 您为每个
Row
创建了 75K Row
个对象。这是一项巨大的分配工作。
- 您重复了
uploaded
和 email
75K 次。短期内它将引用相同的数据,但一旦数据以内部格式进行序列化和反序列化,它们就会指向不同的对象,从而有效地增加内存需求。
根据您要应用的转换,使用 UDF 处理整个数组的效率可能会高得多。
如果这对某人有帮助,我发现使用 flatmap 展平数据比使用 explode 快得多:
dataFrame.as[InputFormat].flatMap(r => r.data.map(v => OutputFormat(v, r.tenant)))
性能提升非常显着。
处理一个包含 20 条记录的文件,每条记录包含一个 250K 行的数组 - 使用分解实现需要 8 小时,使用平面图实现 - 7 分钟 (!)
我们在读取嵌套的 ORC 文件时遇到严重的性能问题。
这是我们的 ORC 架构:
|-- uploader: string (nullable = true)
|-- email: string (nullable = true)
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- startTime: string (nullable = true)
| | |-- endTime: string (nullable = true)
| | |-- val1: string (nullable = true)
| | |-- val2: string (nullable = true)
| | |-- val3: integer (nullable = true)
| | |-- val4: integer (nullable = true)
| | |-- val5: integer (nullable = true)
| | |-- val6: integer (nullable = true)
“数据”数组可能包含 75K 个对象。
在我们的 spark 应用程序中,我们将这个 ORC 展平,如下所示:
val dataFrame = spark.read.orc(files: _*)
val withData = dataFrame.withColumn("data", explode(dataFrame.col("data")))
val withUploader = withData.select($"uploader", $"data")
val allData = withUploader
.withColumn("val_1", $"data.val1")
.withColumn("val_2", $"data.val2")
.withColumn("val_3", $"data.val3")
.withColumn("val_4", $"data.val4")
.withColumn("val_5", $"data.val5")
.withColumn("val_6", $"data.val6")
.withColumn("utc_start_time", timestampUdf($"data.startTime"))
.withColumn("utc_end_time", timestampUdf($"data.endTime"))
allData.drop("data")
扁平化过程似乎是一个非常繁重的操作: 读取一个包含 20 条记录的 2MB ORC 文件,每条记录都包含一个包含 75K 对象的数据数组,需要数小时的处理时间。读取文件并在不展平的情况下收集文件,需要 22 秒。
有没有办法让spark处理数据的速度更快?
我会尽量避免大 explodes
完全。数组中有 75K 个元素:
- 您为每个
Row
创建了 75KRow
个对象。这是一项巨大的分配工作。 - 您重复了
uploaded
和email
75K 次。短期内它将引用相同的数据,但一旦数据以内部格式进行序列化和反序列化,它们就会指向不同的对象,从而有效地增加内存需求。
根据您要应用的转换,使用 UDF 处理整个数组的效率可能会高得多。
如果这对某人有帮助,我发现使用 flatmap 展平数据比使用 explode 快得多:
dataFrame.as[InputFormat].flatMap(r => r.data.map(v => OutputFormat(v, r.tenant)))
性能提升非常显着。
处理一个包含 20 条记录的文件,每条记录包含一个 250K 行的数组 - 使用分解实现需要 8 小时,使用平面图实现 - 7 分钟 (!)