Spark AQE Post-Shuffle partitions coalesce 没有按预期工作,甚至导致某些分区的数据倾斜。为什么?

Spark AQE Post-Shuffle partitions coalesce don't work as expected, and even make data skew in some partitions. Why?

我在我的 spark DF 上使用全局排序,当我启用 AQE 和 post-shuffle 合并时,排序操作后我的分区变得比以前更差。

    "spark.sql.adaptive.enabled" -> "true",
    "spark.sql.adaptive.coalescePartitions.enabled" -> "true",
    "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
    "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
    "spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"

我的查询在高层次上看起来是:

.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
  1. 可能导致偏差的列 -> 是的,我的数据分布不均,这就是我使用盐的原因。
  2. 我从Kafka读取数据,所以我使用Kafka分区+偏移列作为salt。
  3. 为什么在引擎盖下使用 reaprtitoinByRange 的排序对我没有帮助,我想启用 AQE? -> 现在我看到我的 Kafka 消息在大小上可能有太大的差异。所以我看到我的分区在范围重新分区后有接近相同数量的记录,但字节数仍然非常不均匀。
  4. 为什么我认为 AQE 必须帮助我? -> 我想创建许多小范围,即使我的数据倾斜也不会超过 ~50mb,因此 post shuffle coalesce 将能够将它们合并到目标大小 (256mb)。在我的情况下,最大 320mb 是可以的。

我的第一个假设是,即使范围很小,峰值也会太大。 但是我检查并确认按范围重新分区可以使我在记录中得到良好的分布,但大小不好。我有将近 200 个分区,它们的记录数量几乎相同,大小差异高达 9 倍,从 ~100Mb 到 ~900mb。 但是使用 AEQ 和重新分区到 18000 个小范围,最小分区为 18mib,最大分区为 1.8Gib。 这种情况比没有 AEQ 的情况要糟糕得多。 需要强调的是,我使用 Spark UI -> Details for Stage 选项卡中的指标来识别分区大小(以字节为单位),并且我有自己的记录日志。

所以我开始调试这个问题,但是AQE没有足够的输入输出日志 ShufflePartitionsUtil.coalescePartitions。 这就是为什么我将查询重写为 repartitionByRange.sortWithingPartitoins。和 fork Physical Plan optimization with additional logging。 我的日志告诉我,我最初的想法是正确的。

Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435

Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition  maxsize :312832323
Output partition min size :103832323

最小大小如此不同,因为最后一个分区的大小,这是预期的。 TRACE 日志级别显示 99% 的分区接近 290mib。

问题:

P.S。 我还想提一下,我的输入 Kafka 数据帧是 2160,甚至不是分布式分区 -> 一些分区可以比其他分区大 2 倍。从具有 720 个分区和 minPartitions 选项 * 3.

的 Kafka 主题中读取

https://www.mail-archive.com/dev@spark.apache.org/msg26851.html

答案在这里。

在缓存数据中启用 AQE 的最坏情况是不会丢失 using/reusing 缓存的机会,但如果 outputPartitioning 恰好在没有 AQE 的情况下匹配并且在之后不匹配 空气质量指数。发生这种情况的可能性很小。