Spark 动态 DAG 比硬编码 DAG 慢很多

Spark dynamic DAG is a lot slower and different from hard coded DAG

我在 spark 中有一个操作,应该对数据框中的多个列执行。一般来说,有2种可能来指定这样的操作

handleBias("bar", df)
  .join(handleBias("baz", df), df.columns)
  .drop(columnsToDrop: _*).show
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode) {
  if (isFirst) {
    res = handleBias(col, res)
    isFirst = false
  } else {
    res = handleBias(col, res)
  }
}
res.drop(columnsToDrop: _*).show

问题是动态生成的 DAG 是不同的,当使用更多的列时,动态解决方案的运行时间比硬编码操作增加的多得多。

我很好奇如何将动态构造的优雅与快速执行时间结合起来。

这是示例代码的 DAG 比较

对于大约 80 列,这会为硬编码变体生成一个相当不错的图表 对于动态构造的查询,还有一个非常大的、可能不太可并行化且速度较慢的 DAG。

当前版本的 spark (2.0.2) 与 DataFrames 和 spark-sql

一起使用

完成最小示例的代码:

def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
  val pre1_1 = df
    .filter(df(target) === 1)
    .groupBy(col, target)
    .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
    .drop(target)

  val pre2_1 = df
    .groupBy(col)
    .agg(mean(target).alias("pre2_" + col))

  df
    .join(pre1_1, Seq(col), "left")
    .join(pre2_1, Seq(col), "left")
    .na.fill(0)
}

编辑

运行 你的任务 foldleft 生成一个线性 DAG 并对所有列的函数进行硬编码导致

两者都比我原来的 DAG 好很多,但硬编码的变体对我来说看起来更好。在 spark 中连接 SQL 语句的字符串可以让我动态生成硬编码执行图,但这看起来很丑陋。你看到任何其他选项吗?

编辑 1: 从 handleBias 中删除了一个 window 函数并将其转换为广播连接。

编辑 2: 更改了空值的替换策略。

我有一些建议可以改进您的代码。首先,对于 "handleBias" 函数,我会使用 window 函数和 "withColumn" 调用来完成它,避免连接:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

def handleBias(df: DataFrame, colName: String, target: String = "foo") = {
  val w1 = Window.partitionBy(colName)
  val w2 = Window.partitionBy(colName, target)
  val result = df
    .withColumn("cnt_group", count("*").over(w2))
    .withColumn("pre2_" + colName, mean(target).over(w1))
    .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
    .drop("cnt_group")
  result
}

然后,为了为多列调用它,我建议使用 foldLeft,这是解决此类问题的 "functional" 方法:

val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")

val columnsToDrop = Seq("baz")
val columnsToCode = Seq("bar", "baz")
val target = "foo"

val targetCounts = df.filter(df(target) === 1).groupBy(target)
  .agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) {
  (currentDF, colName) => handleBias(currentDF, colName)
}

result.drop(columnsToDrop:_*).show()

+---+--------------------+------------------+--------+------------------+--------+
|foo|                 bar|           pre_baz|pre2_baz|           pre_bar|pre2_bar|
+---+--------------------+------------------+--------+------------------+--------+
|  2|       noValidFormat|               0.0|     2.0|               0.0|     2.0|
|  1|lastAssumingSameDate|0.3333333333333333|     1.0|0.3333333333333333|     1.0|
|  1|              second|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
|  1|               first|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
+---+--------------------+------------------+--------+------------------+--------+

我不确定它会大大改进您的 DAG,但至少它使代码更清晰、更易读。

参考: