Spark final task比first 199耗时100倍,如何改进

Spark final task takes 100x times longer than first 199, how to improve

我在 运行 使用数据帧进行查询时发现了一些性能问题。我在研究中看到,长 运行 finally 任务可能是数据未受到最佳干扰的标志,但尚未找到解决此问题的详细过程。

我开始加载两个表作为数据框,然后将这些表连接到一个字段中。我尝试添加分发依据(重新分区)和排序依据以提高性能,但我仍然看到这个单一的长 运行 最终任务。这是我的代码的一个简单版本,请注意,查询一和查询二实际上并不是这么简单,而是使用 UDF 来计算一些值。

我已经为 spark.sql.shuffle 尝试了几种不同的设置。我已经尝试了 100 次,但都失败了(老实说,我并没有真正调试它)。我尝试了 300、4000 和 8000。性能随着每次增加而下降。我正在选择一天的数据,其中每个文件是一个小时。

val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")

val distributeDf1 = df1
    .repartition(df1("userId"))
    .sortWithinPartitions(df1("userId"))

val distributeDf2 = df2
    .repartition(df2("userId"))
    .sortWithinPartitions(df2("userId"))

distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")

val df3 = sqlContext
  .sql("""
    Select 
      df1.* 
    from 
      df1 
    left outer join df2 on 
      df1.userId = df2.userId""")

由于按 userId 分区似乎不太理想,我可以改为按时间戳分区。如果我这样做,我应该只做 Date + Hour 吗?如果我的独特组合少于 200 个,我会有空执行器吗?

Spark >= 3.0

从 3.0 开始,Spark 提供了 built-in 处理倾斜连接的优化 - 可以使用 spark.sql.adaptive.optimizeSkewedJoin.enabled 属性 启用。

详情见SPARK-29544

Spark < 3.0

你显然有一个巨大的正确数据倾斜的问题。让我们来看看 statistics you've provided:

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
df2 = [mean=1.0, stddev=0.0, count=18408194]

平均数约为 5,标准差超过 2000,你会得到一个 长尾巴

由于在重新分区后一些键比其他键更频繁,一些执行器将比其余的执行器有更多的工作要做。

此外,您的描述表明问题可能出在散列到同一分区的单个或几个键上。

所以,让我们首先识别异常值(伪代码):

val mean = 4.989209978967438 
val sd = 2255.654165352454

val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache

val frequent = counts
  .where($"count" > mean + 2 * sd)  // Adjust threshold based on actual dist.
  .alias("frequent")
  .join(df1, Seq("userId"))

其余:

val infrequent = counts
  .where($"count" <= mean + 2 * sd)
  .alias("infrequent")
  .join(df1, Seq("userId"))

这真的是意料之中的事情吗?如果不是,请尝试在上游找出问题的根源。

如果符合预期,可以试试:

  • 广播变小table:

    val df2 = sqlContext.sql("Select * from Table2")
    df2.join(broadcast(df1), Seq("userId"), "rightouter")
    
  • 分裂,统一(union)和广播只频繁:

    df2.join(broadcast(frequent), Seq("userId"), "rightouter")
      .union(df2.join(infrequent, Seq("userId"), "rightouter"))
    
  • 用一些随机数据加盐userId

但是你不应该:

  • 重新分区所有数据并在本地排序(尽管单独在本地排序应该不是问题)
  • 对完整数据执行标准散列连接。