Spark AQE Post-Shuffle partitions coalesce 没有按预期工作,甚至导致某些分区的数据倾斜。为什么?
Spark AQE Post-Shuffle partitions coalesce don't work as expected, and even make data skew in some partitions. Why?
我在我的 spark DF 上使用全局排序,当我启用 AQE 和 post-shuffle 合并时,排序操作后我的分区变得比以前更差。
"spark.sql.adaptive.enabled" -> "true",
"spark.sql.adaptive.coalescePartitions.enabled" -> "true",
"spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
"spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
"spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"
我的查询在高层次上看起来是:
.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
- 可能导致偏差的列 -> 是的,我的数据分布不均,这就是我使用盐的原因。
- 我从Kafka读取数据,所以我使用Kafka分区+偏移列作为salt。
- 为什么在引擎盖下使用 reaprtitoinByRange 的排序对我没有帮助,我想启用 AQE? -> 现在我看到我的 Kafka 消息在大小上可能有太大的差异。所以我看到我的分区在范围重新分区后有接近相同数量的记录,但字节数仍然非常不均匀。
- 为什么我认为 AQE 必须帮助我? -> 我想创建许多小范围,即使我的数据倾斜也不会超过 ~50mb,因此 post shuffle coalesce 将能够将它们合并到目标大小 (256mb)。在我的情况下,最大 320mb 是可以的。
我的第一个假设是,即使范围很小,峰值也会太大。
但是我检查并确认按范围重新分区可以使我在记录中得到良好的分布,但大小不好。我有将近 200 个分区,它们的记录数量几乎相同,大小差异高达 9 倍,从 ~100Mb 到 ~900mb。
但是使用 AEQ 和重新分区到 18000 个小范围,最小分区为 18mib,最大分区为 1.8Gib。
这种情况比没有 AEQ 的情况要糟糕得多。
需要强调的是,我使用 Spark UI -> Details for Stage 选项卡中的指标来识别分区大小(以字节为单位),并且我有自己的记录日志。
所以我开始调试这个问题,但是AQE没有足够的输入输出日志
ShufflePartitionsUtil.coalescePartitions
。
这就是为什么我将查询重写为 repartitionByRange.sortWithingPartitoins。和 fork Physical Plan optimization with additional logging。
我的日志告诉我,我最初的想法是正确的。
- map 和 write shuffle 阶段后的输入分区被分割得足够小
- 合并算法将它们收集到在字节分区中均匀分布的正确数字。
Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435
和
Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition maxsize :312832323
Output partition min size :103832323
最小大小如此不同,因为最后一个分区的大小,这是预期的。 TRACE 日志级别显示 99% 的分区接近 290mib。
但为什么 spark UI 显示如此不同的结果? ->
可能火花UI是错的? ->
也许吧,但是除了任务量大,任务的持续时间也太大了,这让我觉得spark UI还可以。
所以我的假设是我阶段的 MapOutputStatistics
有问题。但它总是坏掉还是只在我的情况下坏掉? ->
只有我的情况? -> 我做了一些检查以确认它。
-
- 我从 s3(块大小为 120mb 的镶木地板文件)读取了相同的数据集 -> 并且 AQE 按预期工作。 post shuffle coalesce return 给我 188,按大小、分区分布良好。重要的是要注意 s3 上的数据分布不均,但在读取期间 spark 将其拆分为 259 个接近 120mb 大小的分区,主要是因为 parquet 块大小为 120mb。
-
- 我从 Kafka 读取了相同的数据集,但从分区函数中排除了倾斜的列 -> 并且 AQE 按预期工作。 post shuffle coalesce return 给我 203,按大小、分区分布良好。
-
- 我尝试禁用缓存 -> 这没有任何结果。我使用缓存,只是为了避免从 kafka 中重复读取。因为按范围重新分区使用采样。
-
- 我尝试禁用 AQE 并将 18000 个分区写入 s3 -> 结果符合预期,与我在合并输入上的日志显示的相同:17999 个文件,最小的接近 8mib,最大的 56mib。
所有这些检查让我认为 MapOutputStatistics
仅对我而言是错误的。可能是如何与 Kafka 源关联或我的 Kafka 输入数据分布非常不均匀的问题。
问题:
- 所以有人知道我做错了什么吗?
- 在我的情况下,我可以如何处理输入数据以使 post 随机合并工作?
- 如果你觉得我说的对,欢迎评论。
P.S。
我还想提一下,我的输入 Kafka 数据帧是 2160,甚至不是分布式分区 -> 一些分区可以比其他分区大 2 倍。从具有 720 个分区和 minPartitions
选项 * 3.
的 Kafka 主题中读取
https://www.mail-archive.com/dev@spark.apache.org/msg26851.html
答案在这里。
在缓存数据中启用 AQE 的最坏情况是不会丢失
using/reusing 缓存的机会,但如果
outputPartitioning 恰好在没有 AQE 的情况下匹配并且在之后不匹配
空气质量指数。发生这种情况的可能性很小。
我在我的 spark DF 上使用全局排序,当我启用 AQE 和 post-shuffle 合并时,排序操作后我的分区变得比以前更差。
"spark.sql.adaptive.enabled" -> "true",
"spark.sql.adaptive.coalescePartitions.enabled" -> "true",
"spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
"spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
"spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"
我的查询在高层次上看起来是:
.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
- 可能导致偏差的列 -> 是的,我的数据分布不均,这就是我使用盐的原因。
- 我从Kafka读取数据,所以我使用Kafka分区+偏移列作为salt。
- 为什么在引擎盖下使用 reaprtitoinByRange 的排序对我没有帮助,我想启用 AQE? -> 现在我看到我的 Kafka 消息在大小上可能有太大的差异。所以我看到我的分区在范围重新分区后有接近相同数量的记录,但字节数仍然非常不均匀。
- 为什么我认为 AQE 必须帮助我? -> 我想创建许多小范围,即使我的数据倾斜也不会超过 ~50mb,因此 post shuffle coalesce 将能够将它们合并到目标大小 (256mb)。在我的情况下,最大 320mb 是可以的。
我的第一个假设是,即使范围很小,峰值也会太大。 但是我检查并确认按范围重新分区可以使我在记录中得到良好的分布,但大小不好。我有将近 200 个分区,它们的记录数量几乎相同,大小差异高达 9 倍,从 ~100Mb 到 ~900mb。 但是使用 AEQ 和重新分区到 18000 个小范围,最小分区为 18mib,最大分区为 1.8Gib。 这种情况比没有 AEQ 的情况要糟糕得多。 需要强调的是,我使用 Spark UI -> Details for Stage 选项卡中的指标来识别分区大小(以字节为单位),并且我有自己的记录日志。
所以我开始调试这个问题,但是AQE没有足够的输入输出日志
ShufflePartitionsUtil.coalescePartitions
。
这就是为什么我将查询重写为 repartitionByRange.sortWithingPartitoins。和 fork Physical Plan optimization with additional logging。
我的日志告诉我,我最初的想法是正确的。
- map 和 write shuffle 阶段后的输入分区被分割得足够小
- 合并算法将它们收集到在字节分区中均匀分布的正确数字。
Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435
和
Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition maxsize :312832323
Output partition min size :103832323
最小大小如此不同,因为最后一个分区的大小,这是预期的。 TRACE 日志级别显示 99% 的分区接近 290mib。
但为什么 spark UI 显示如此不同的结果? ->
可能火花UI是错的? ->
也许吧,但是除了任务量大,任务的持续时间也太大了,这让我觉得spark UI还可以。
所以我的假设是我阶段的
MapOutputStatistics
有问题。但它总是坏掉还是只在我的情况下坏掉? ->只有我的情况? -> 我做了一些检查以确认它。
-
- 我从 s3(块大小为 120mb 的镶木地板文件)读取了相同的数据集 -> 并且 AQE 按预期工作。 post shuffle coalesce return 给我 188,按大小、分区分布良好。重要的是要注意 s3 上的数据分布不均,但在读取期间 spark 将其拆分为 259 个接近 120mb 大小的分区,主要是因为 parquet 块大小为 120mb。
-
- 我从 Kafka 读取了相同的数据集,但从分区函数中排除了倾斜的列 -> 并且 AQE 按预期工作。 post shuffle coalesce return 给我 203,按大小、分区分布良好。
-
- 我尝试禁用缓存 -> 这没有任何结果。我使用缓存,只是为了避免从 kafka 中重复读取。因为按范围重新分区使用采样。
-
- 我尝试禁用 AQE 并将 18000 个分区写入 s3 -> 结果符合预期,与我在合并输入上的日志显示的相同:17999 个文件,最小的接近 8mib,最大的 56mib。
所有这些检查让我认为
MapOutputStatistics
仅对我而言是错误的。可能是如何与 Kafka 源关联或我的 Kafka 输入数据分布非常不均匀的问题。
问题:
- 所以有人知道我做错了什么吗?
- 在我的情况下,我可以如何处理输入数据以使 post 随机合并工作?
- 如果你觉得我说的对,欢迎评论。
P.S。
我还想提一下,我的输入 Kafka 数据帧是 2160,甚至不是分布式分区 -> 一些分区可以比其他分区大 2 倍。从具有 720 个分区和 minPartitions
选项 * 3.
https://www.mail-archive.com/dev@spark.apache.org/msg26851.html
答案在这里。
在缓存数据中启用 AQE 的最坏情况是不会丢失 using/reusing 缓存的机会,但如果 outputPartitioning 恰好在没有 AQE 的情况下匹配并且在之后不匹配 空气质量指数。发生这种情况的可能性很小。