增加读取镶木地板文件的并行度 - Spark 优化自连接

Increase parallelism of reading a parquet file - Spark optimize self join

我想执行自连接以生成候选匹配对。目前,这不起作用,因为此操作太慢了。不幸的是,我无法广播数据帧,因为它们太大了。

首先我聚合了元组的数量来减少数据:

val aggregated = df.groupBy("discrete_foo", "discrete_bar").agg(sum("value"))
aggregated.repartition(7, "discrete_foo", "discrete_bar").sortWithinPartitions("discreate_foo", "discrete_bar, "baz").write.parquet.option("compression", "gzip").mode("overwrite")

这很好用而且速度很快。然后,我想执行一个自连接来生成候选人。 我已经观察到我需要产生更多的并行性:

--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \ 

因此,设置了增加的默认值和随机播放并行度。此外,我尝试粗化两个离散值(即增加落入离散块的项目数量),从而减少元组的数量。仍然没有运气。所以我另外尝试通过重新分区来强制执行更多的任务:

val materializedAggregated= spark.read.parquet(s"path/to/file/aggregated_stuff"      )
  .repartition(4000)
val selfB = materializedAggregated
  .withColumnRenamed("baz", "other_batz")
  .withColumnRenamed("value", "other_value")

val candidates = materializedMultiSTW
  .join(selfB, Seq("discrete_foo", "discrete_bar"), "inner")
  .filter(col(FeedIdentifierColumns.imsiHash) =!= col("other_imsi_hash"))

但是这也不起作用,而且速度太慢了。我还能做些什么来使这个查询计算得更快?有什么我想念的吗?

下面您将看到各种尝试在读取自连接数据时增加并行度的失败尝试。

我什至设置了:

--conf spark.sql.files.maxPartitionBytes=16777216 \

到 1/8,即 16 vs. 128MB,生成的任务数量仍然太少,即只有 250 个。

一些细节

执行计划:

即使没有这个手动重新分区也太慢了,我担心创建的分区不够:

处理的任务更少 - 这很可能会使速度变慢:

如何确保这个初始步骤具有更高的并行度? 分桶有帮助吗?但是当只读取一次打乱的数据时——它不会真正产生加速——对吧? 写入聚合文件时的重新分区步骤如何?我应该在这里设置更高的数字吗? 到目前为止,即使省略它(并且基本上重新计算聚合两次) - 它也不会增加超过 260 个任务。

环境

我在 HDP 3.1

上使用 spark 2.3.x

无论 spark.sql.shuffle.partitionsspark.default.parallelism 的设置如何,内部连接的最大任务数将等于连接键的数量(即它们的基数)。

这是因为在 SortMergeJoin 中,数据将使用连接键的哈希值进行混洗。来自每个不同连接键的所有数据都将发送给单个执行程序。

因此,问题是您没有足够的垃圾箱 - 它们太粗糙了。您将看到的最大任务数将等于垃圾箱的数量。

如果您以更细粒度的方式对数据进行分类,您应该会看到任务数量增加。