Spark shuffle 读取小数据需要大量时间

Spark shuffle read takes significant time for small data

我们正在 运行 下一个阶段 DAG,并且对于相对较小的混洗数据大小(每个任务大约 19MB)经历了较长的混洗读取时间

一个有趣的方面是每个 executor/server 中的等待任务具有等效的随机读取时间。这是它的含义的一个示例:对于以下服务器,一组任务等待大约 7.7 分钟,另一组任务等待大约 26 秒。

这是同一阶段的另一个例子运行。该图显示了 3 个执行器/服务器,每个执行器/服务器都有统一的任务组,随机读取时间相同。蓝色组代表由于推测执行而被杀死的任务:

并不是所有的执行者都是这样的。有一些几乎是一致的在几秒钟内完成所有任务,这些任务的远程读取数据大小与在其他服务器上等待很长时间的任务相同。 此外,这种类型的阶段 运行s 在我们的应用程序 运行 时间内出现了 2 次。产生这些大shuffle read time任务组的servers/executors在每个阶段运行.

是不同的

这是其中一个服务器/主机的任务统计信息示例 table:

看起来负责此 DAG 的代码如下:

output.write.parquet("output.parquet")
comparison.write.parquet("comparison.parquet")
output.union(comparison).write.parquet("output_comparison.parquet")
val comparison = data.union(output).except(data.intersect(output)).cache()
comparison.filter(_.abc != "M").count()

我们非常感谢您对此的看法。

显然问题出在 JVM 垃圾回收 (GC) 上。这些任务必须等到远程执行器上的 GC 完成。等效的随机读取时间是由于多个任务正在等待执行 GC 的单个远程主机这一事实造成的。我们遵循了 发布的建议,问题减少了一个数量级。远程主机上的 GC 时间与本地随机读取时间之间仍然存在很小的相关性。以后想试试shuffle服务。

自从 google 带着同样的问题来到这里,但我需要另一个解决方案...

小的 shuffle 大小需要很长时间读取的另一个可能原因可能是数据被拆分到多个分区。例如(抱歉,这是 pyspark,因为我只用过它):

my_df_with_many_partitions\ # say has 1000 partitions
    .filter(very_specific_filter)\ # only very few rows pass
    .groupby('blah')\
    .count()

上面filter的shuffle write会很小,所以后面的stage我们读的量会很小。但是要阅读它,您需要检查很多空分区。解决这个问题的一种方法是:

my_df_with_many_partitions\
    .filter(very_specific_filter)\
    .repartition(1)\
    .groupby('blah')\
    .count()