为什么我的 shuffle partition 在 group by 操作时不是 200(default)? (火花 2.4.5)

Why my shuffle partition is not 200(default) during group by operation? (Spark 2.4.5)

我是 spark 的新手,正在尝试了解它的内部结构。所以, 我正在从 s3 读取一个 50MB 的小镶木地板文件并执行分组,然后保存回 s3。 当我观察 Spark UI 时,我可以看到为此创建了 3 个阶段,

阶段 0:加载(1 个任务)

阶段 1:用于分组的 shufflequery 阶段(12 个任务)

阶段 2:保存 (coalescedshufflereader)(26 个任务)

代码示例:

df = spark.read.format("parquet").load(src_loc)
df_agg = df.groupby(grp_attribute)\                             
 .agg(F.sum("no_of_launches").alias("no_of_launchesGroup")
df_agg.write.mode("overwrite").parquet(target_loc)

我正在使用具有 1 个主节点、3 个核心节点(每个节点有 4 个 vcores)的 EMR 实例。因此,默认并行度为 12。我不会在运行时更改任何配置。但是我无法理解为什么在最后阶段创建了 26 个任务?据我了解,默认情况下随机播放分区应为 200。附上 UI 的屏幕截图。

在 Spark sql 中,shuffle 分区的数量使用 spark.sql.shuffle.partitions 设置,默认为 200。在大多数情况下,这个数字对于较小的数据来说太高了,对于更大的数据来说太小了.选择正确的值对开发人员来说总是很棘手。

所以我们需要能够通过查看映射器输出来合并随机分区。如果映射生成的分区数量较少,我们希望减少整体随机分区,从而提高性能。

在带有Adaptive Query Execution的最新版本Spark3.0中,这个减少任务的功能是自动化的。 http://blog.madhukaraphatak.com/spark-aqe-part-2/

在 Spark2.4.5 中考虑到这一点,catalist 优化器或 EMR 也可能启用了此功能以在内部减少任务而不是 200 个任务。

我在带有 Spark 2.4.5 的 Databricks 上尝试了类似的逻辑。

我观察到 spark.conf.set('spark.sql.adaptive.enabled', 'true'),我的分区的最终数量是 2。

我观察到 spark.conf.set('spark.sql.adaptive.enabled', 'false')spark.conf.set('spark.sql.shuffle.partitions', 75),我的分区的最终数量是 75。

使用 print(df_agg.rdd.getNumPartitions()) 揭示了这一点。

因此,Spark UI 上的作业输出并未反映这一点。可能是重新分区发生在最后。有趣,但不是真正的问题。