Spark final task比first 199耗时100倍,如何改进
Spark final task takes 100x times longer than first 199, how to improve
我在 运行 使用数据帧进行查询时发现了一些性能问题。我在研究中看到,长 运行 finally 任务可能是数据未受到最佳干扰的标志,但尚未找到解决此问题的详细过程。
我开始加载两个表作为数据框,然后将这些表连接到一个字段中。我尝试添加分发依据(重新分区)和排序依据以提高性能,但我仍然看到这个单一的长 运行 最终任务。这是我的代码的一个简单版本,请注意,查询一和查询二实际上并不是这么简单,而是使用 UDF 来计算一些值。
我已经为 spark.sql.shuffle
尝试了几种不同的设置。我已经尝试了 100 次,但都失败了(老实说,我并没有真正调试它)。我尝试了 300、4000 和 8000。性能随着每次增加而下降。我正在选择一天的数据,其中每个文件是一个小时。
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
.repartition(df1("userId"))
.sortWithinPartitions(df1("userId"))
val distributeDf2 = df2
.repartition(df2("userId"))
.sortWithinPartitions(df2("userId"))
distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")
val df3 = sqlContext
.sql("""
Select
df1.*
from
df1
left outer join df2 on
df1.userId = df2.userId""")
由于按 userId 分区似乎不太理想,我可以改为按时间戳分区。如果我这样做,我应该只做 Date + Hour 吗?如果我的独特组合少于 200 个,我会有空执行器吗?
Spark >= 3.0
从 3.0 开始,Spark 提供了 built-in 处理倾斜连接的优化 - 可以使用 spark.sql.adaptive.optimizeSkewedJoin.enabled
属性 启用。
详情见SPARK-29544。
Spark < 3.0
你显然有一个巨大的正确数据倾斜的问题。让我们来看看 statistics you've provided:
df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088]
df2 = [mean=1.0, stddev=0.0, count=18408194]
平均数约为 5,标准差超过 2000,你会得到一个 长尾巴 。
由于在重新分区后一些键比其他键更频繁,一些执行器将比其余的执行器有更多的工作要做。
此外,您的描述表明问题可能出在散列到同一分区的单个或几个键上。
所以,让我们首先识别异常值(伪代码):
val mean = 4.989209978967438
val sd = 2255.654165352454
val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache
val frequent = counts
.where($"count" > mean + 2 * sd) // Adjust threshold based on actual dist.
.alias("frequent")
.join(df1, Seq("userId"))
其余:
val infrequent = counts
.where($"count" <= mean + 2 * sd)
.alias("infrequent")
.join(df1, Seq("userId"))
这真的是意料之中的事情吗?如果不是,请尝试在上游找出问题的根源。
如果符合预期,可以试试:
广播变小table:
val df2 = sqlContext.sql("Select * from Table2")
df2.join(broadcast(df1), Seq("userId"), "rightouter")
分裂,统一(union
)和广播只频繁:
df2.join(broadcast(frequent), Seq("userId"), "rightouter")
.union(df2.join(infrequent, Seq("userId"), "rightouter"))
用一些随机数据加盐userId
但是你不应该:
- 重新分区所有数据并在本地排序(尽管单独在本地排序应该不是问题)
- 对完整数据执行标准散列连接。
我在 运行 使用数据帧进行查询时发现了一些性能问题。我在研究中看到,长 运行 finally 任务可能是数据未受到最佳干扰的标志,但尚未找到解决此问题的详细过程。
我开始加载两个表作为数据框,然后将这些表连接到一个字段中。我尝试添加分发依据(重新分区)和排序依据以提高性能,但我仍然看到这个单一的长 运行 最终任务。这是我的代码的一个简单版本,请注意,查询一和查询二实际上并不是这么简单,而是使用 UDF 来计算一些值。
我已经为 spark.sql.shuffle
尝试了几种不同的设置。我已经尝试了 100 次,但都失败了(老实说,我并没有真正调试它)。我尝试了 300、4000 和 8000。性能随着每次增加而下降。我正在选择一天的数据,其中每个文件是一个小时。
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
.repartition(df1("userId"))
.sortWithinPartitions(df1("userId"))
val distributeDf2 = df2
.repartition(df2("userId"))
.sortWithinPartitions(df2("userId"))
distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")
val df3 = sqlContext
.sql("""
Select
df1.*
from
df1
left outer join df2 on
df1.userId = df2.userId""")
由于按 userId 分区似乎不太理想,我可以改为按时间戳分区。如果我这样做,我应该只做 Date + Hour 吗?如果我的独特组合少于 200 个,我会有空执行器吗?
Spark >= 3.0
从 3.0 开始,Spark 提供了 built-in 处理倾斜连接的优化 - 可以使用 spark.sql.adaptive.optimizeSkewedJoin.enabled
属性 启用。
详情见SPARK-29544。
Spark < 3.0
你显然有一个巨大的正确数据倾斜的问题。让我们来看看 statistics you've provided:
df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088]
df2 = [mean=1.0, stddev=0.0, count=18408194]
平均数约为 5,标准差超过 2000,你会得到一个 长尾巴 。
由于在重新分区后一些键比其他键更频繁,一些执行器将比其余的执行器有更多的工作要做。
此外,您的描述表明问题可能出在散列到同一分区的单个或几个键上。
所以,让我们首先识别异常值(伪代码):
val mean = 4.989209978967438
val sd = 2255.654165352454
val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache
val frequent = counts
.where($"count" > mean + 2 * sd) // Adjust threshold based on actual dist.
.alias("frequent")
.join(df1, Seq("userId"))
其余:
val infrequent = counts
.where($"count" <= mean + 2 * sd)
.alias("infrequent")
.join(df1, Seq("userId"))
这真的是意料之中的事情吗?如果不是,请尝试在上游找出问题的根源。
如果符合预期,可以试试:
广播变小table:
val df2 = sqlContext.sql("Select * from Table2") df2.join(broadcast(df1), Seq("userId"), "rightouter")
分裂,统一(
union
)和广播只频繁:df2.join(broadcast(frequent), Seq("userId"), "rightouter") .union(df2.join(infrequent, Seq("userId"), "rightouter"))
用一些随机数据加盐
userId
但是你不应该:
- 重新分区所有数据并在本地排序(尽管单独在本地排序应该不是问题)
- 对完整数据执行标准散列连接。