Spark Dataset/Dataframe 加入 NULL 倾斜键

Spark Dataset/Dataframe join NULL skew key

使用 Spark Dataset/DataFrame 连接时,我遇到了很长 运行 并且因 OOM 作业而失败。

这里是输入:

经过一些分析,我发现作业失败和缓慢的原因是 null 倾斜键:当左侧有数百万条带有连接键的记录时 null

我做了一些暴力破解的方法来解决这个问题,在这里我想分享一下。

如果您有更好的或任何内置的解决方案(对于常规 Apache Spark),请分享。

这是我得出的解决方案:

  /**
    * Expression that produce negative random between -1 and -`lowestValue`(inclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("negative", negativeRandomWithin(3))
    *                  .select("negative")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |negative|
    *          +--------+
    *          |-2      |
    *          |-3      |
    *          |-1      |
    *          +--------+
    */
  private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
    negate(positiveRandomWithin(lowestValue)) - 1
  }

  /**
    * Expression that produce positive random between 0 and `highestValue`(exclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("positive", positiveRandomWithin(3))
    *                  .select("positive")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |positive|
    *          +--------+
    *          |0       |
    *          |1       |
    *          |2       |
    *          +--------+
    */
  private[transformation] def positiveRandomWithin(highestValue: Long) = {
    pmod((rand * highestValue).cast(LongType), lit(highestValue))
  }

  implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {

    /**
      * Particular optimized version of left outer join where left side of join has skewed `null` field.
      *
      * @note
      *       It works only for single column join which is applicable for `isNotNull`.
      *
      * Optimization algorithm:
      *   1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
      *   2. use appended column, with original join column value and `null` replacements, as join column from left dataset
      *      appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
      *
      * @note there is no checks how many `null` values on left dataset before applying above steps,
      *       as well as there is no checks does it sort merge join or broadcast.
      *
      * IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
      *
      * HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
      */
    private[transformation] def nullSkewLeftJoin(right: DataFrame,
                                                 joinLeftCol: Column,
                                                 joinRightCol: Column,
                                                 skewedColumnPostFix: String = "skewed_column",
                                                 nullNumBuckets: Int = 10000): DataFrame = {

      val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"

      if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
        underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      } else {
        underlying
          .withColumn(skewedTempColumn,
                      when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
          .join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      }
    }
  }

简而言之:我将左数据集连接键 null 值替换为负范围,以使其均匀重新分区。

注意:此解决方案仅适用于左连接和 null 连接键倾斜。我不想分解正确的数据集并为任何键做倾斜解决方案。此外,在该步骤之后,null 连接键值将分布到不同的分区,因此,mapPartitions 等将不起作用。

总而言之,上述解决方案对我有所帮助,但我希望看到针对此类数据集连接问题的更多解决方案。

我前段时间遇到了同样的问题,但在做了一些性能测试后,我选择了另一种方法。这取决于你的数据,数据会告诉你解决这个连接问题的更好算法是什么。

在我的例子中,我有超过 30% 的数据在 join 的左侧为 null,并且数据是 parquet 格式。鉴于此,我最好执行一个 filter,其中此键为空且此键不为空,仅在非空时加入,然后合并两个数据。

val data = ...
val notJoinable = data.filter('keyToJoin.isNull)
val joinable = data.filter('keyToJoin.isNotNull)

joinable.join(...) union notJoinable

它也避免了热点。如果我使用你的方法(负 numbers/whatever 而不是 -"joinable" 值),spark 将洗牌所有这些数据,这是很多数据(超过 30%)。

只是想向您展示另一种解决问题的方法,