增加读取镶木地板文件的并行度 - Spark 优化自连接
Increase parallelism of reading a parquet file - Spark optimize self join
我想执行自连接以生成候选匹配对。目前,这不起作用,因为此操作太慢了。不幸的是,我无法广播数据帧,因为它们太大了。
首先我聚合了元组的数量来减少数据:
val aggregated = df.groupBy("discrete_foo", "discrete_bar").agg(sum("value"))
aggregated.repartition(7, "discrete_foo", "discrete_bar").sortWithinPartitions("discreate_foo", "discrete_bar, "baz").write.parquet.option("compression", "gzip").mode("overwrite")
这很好用而且速度很快。然后,我想执行一个自连接来生成候选人。
我已经观察到我需要产生更多的并行性:
--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \
因此,设置了增加的默认值和随机播放并行度。此外,我尝试粗化两个离散值(即增加落入离散块的项目数量),从而减少元组的数量。仍然没有运气。所以我另外尝试通过重新分区来强制执行更多的任务:
val materializedAggregated= spark.read.parquet(s"path/to/file/aggregated_stuff" )
.repartition(4000)
val selfB = materializedAggregated
.withColumnRenamed("baz", "other_batz")
.withColumnRenamed("value", "other_value")
val candidates = materializedMultiSTW
.join(selfB, Seq("discrete_foo", "discrete_bar"), "inner")
.filter(col(FeedIdentifierColumns.imsiHash) =!= col("other_imsi_hash"))
但是这也不起作用,而且速度太慢了。我还能做些什么来使这个查询计算得更快?有什么我想念的吗?
下面您将看到各种尝试在读取自连接数据时增加并行度的失败尝试。
我什至设置了:
--conf spark.sql.files.maxPartitionBytes=16777216 \
到 1/8,即 16 vs. 128MB,生成的任务数量仍然太少,即只有 250 个。
一些细节
执行计划:
即使没有这个手动重新分区也太慢了,我担心创建的分区不够:
处理的任务更少 - 这很可能会使速度变慢:
如何确保这个初始步骤具有更高的并行度?
分桶有帮助吗?但是当只读取一次打乱的数据时——它不会真正产生加速——对吧?
写入聚合文件时的重新分区步骤如何?我应该在这里设置更高的数字吗?
到目前为止,即使省略它(并且基本上重新计算聚合两次) - 它也不会增加超过 260 个任务。
环境
我在 HDP 3.1
上使用 spark 2.3.x
无论 spark.sql.shuffle.partitions
和 spark.default.parallelism
的设置如何,内部连接的最大任务数将等于连接键的数量(即它们的基数)。
这是因为在 SortMergeJoin 中,数据将使用连接键的哈希值进行混洗。来自每个不同连接键的所有数据都将发送给单个执行程序。
因此,问题是您没有足够的垃圾箱 - 它们太粗糙了。您将看到的最大任务数将等于垃圾箱的数量。
如果您以更细粒度的方式对数据进行分类,您应该会看到任务数量增加。
我想执行自连接以生成候选匹配对。目前,这不起作用,因为此操作太慢了。不幸的是,我无法广播数据帧,因为它们太大了。
首先我聚合了元组的数量来减少数据:
val aggregated = df.groupBy("discrete_foo", "discrete_bar").agg(sum("value"))
aggregated.repartition(7, "discrete_foo", "discrete_bar").sortWithinPartitions("discreate_foo", "discrete_bar, "baz").write.parquet.option("compression", "gzip").mode("overwrite")
这很好用而且速度很快。然后,我想执行一个自连接来生成候选人。 我已经观察到我需要产生更多的并行性:
--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \
因此,设置了增加的默认值和随机播放并行度。此外,我尝试粗化两个离散值(即增加落入离散块的项目数量),从而减少元组的数量。仍然没有运气。所以我另外尝试通过重新分区来强制执行更多的任务:
val materializedAggregated= spark.read.parquet(s"path/to/file/aggregated_stuff" )
.repartition(4000)
val selfB = materializedAggregated
.withColumnRenamed("baz", "other_batz")
.withColumnRenamed("value", "other_value")
val candidates = materializedMultiSTW
.join(selfB, Seq("discrete_foo", "discrete_bar"), "inner")
.filter(col(FeedIdentifierColumns.imsiHash) =!= col("other_imsi_hash"))
但是这也不起作用,而且速度太慢了。我还能做些什么来使这个查询计算得更快?有什么我想念的吗?
下面您将看到各种尝试在读取自连接数据时增加并行度的失败尝试。
我什至设置了:
--conf spark.sql.files.maxPartitionBytes=16777216 \
到 1/8,即 16 vs. 128MB,生成的任务数量仍然太少,即只有 250 个。
一些细节
执行计划:
即使没有这个手动重新分区也太慢了,我担心创建的分区不够:
处理的任务更少 - 这很可能会使速度变慢:
如何确保这个初始步骤具有更高的并行度? 分桶有帮助吗?但是当只读取一次打乱的数据时——它不会真正产生加速——对吧? 写入聚合文件时的重新分区步骤如何?我应该在这里设置更高的数字吗? 到目前为止,即使省略它(并且基本上重新计算聚合两次) - 它也不会增加超过 260 个任务。
环境
我在 HDP 3.1
上使用 spark 2.3.x无论 spark.sql.shuffle.partitions
和 spark.default.parallelism
的设置如何,内部连接的最大任务数将等于连接键的数量(即它们的基数)。
这是因为在 SortMergeJoin 中,数据将使用连接键的哈希值进行混洗。来自每个不同连接键的所有数据都将发送给单个执行程序。
因此,问题是您没有足够的垃圾箱 - 它们太粗糙了。您将看到的最大任务数将等于垃圾箱的数量。
如果您以更细粒度的方式对数据进行分类,您应该会看到任务数量增加。