Spark 动态 DAG 比硬编码 DAG 慢很多
Spark dynamic DAG is a lot slower and different from hard coded DAG
我在 spark 中有一个操作,应该对数据框中的多个列执行。一般来说,有2种可能来指定这样的操作
- 硬编码
handleBias("bar", df)
.join(handleBias("baz", df), df.columns)
.drop(columnsToDrop: _*).show
- 从列名列表中动态生成它们
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode) {
if (isFirst) {
res = handleBias(col, res)
isFirst = false
} else {
res = handleBias(col, res)
}
}
res.drop(columnsToDrop: _*).show
问题是动态生成的 DAG 是不同的,当使用更多的列时,动态解决方案的运行时间比硬编码操作增加的多得多。
我很好奇如何将动态构造的优雅与快速执行时间结合起来。
这是示例代码的 DAG 比较
对于大约 80 列,这会为硬编码变体生成一个相当不错的图表
对于动态构造的查询,还有一个非常大的、可能不太可并行化且速度较慢的 DAG。
当前版本的 spark (2.0.2) 与 DataFrames
和 spark-sql
一起使用
完成最小示例的代码:
def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
编辑
运行 你的任务 foldleft
生成一个线性 DAG
并对所有列的函数进行硬编码导致
两者都比我原来的 DAG 好很多,但硬编码的变体对我来说看起来更好。在 spark 中连接 SQL 语句的字符串可以让我动态生成硬编码执行图,但这看起来很丑陋。你看到任何其他选项吗?
编辑 1: 从 handleBias 中删除了一个 window 函数并将其转换为广播连接。
编辑 2: 更改了空值的替换策略。
我有一些建议可以改进您的代码。首先,对于 "handleBias" 函数,我会使用 window 函数和 "withColumn" 调用来完成它,避免连接:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
def handleBias(df: DataFrame, colName: String, target: String = "foo") = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
val result = df
.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
result
}
然后,为了为多列调用它,我建议使用 foldLeft
,这是解决此类问题的 "functional" 方法:
val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")
val columnsToDrop = Seq("baz")
val columnsToCode = Seq("bar", "baz")
val target = "foo"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
.agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) {
(currentDF, colName) => handleBias(currentDF, colName)
}
result.drop(columnsToDrop:_*).show()
+---+--------------------+------------------+--------+------------------+--------+
|foo| bar| pre_baz|pre2_baz| pre_bar|pre2_bar|
+---+--------------------+------------------+--------+------------------+--------+
| 2| noValidFormat| 0.0| 2.0| 0.0| 2.0|
| 1|lastAssumingSameDate|0.3333333333333333| 1.0|0.3333333333333333| 1.0|
| 1| second|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
| 1| first|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
+---+--------------------+------------------+--------+------------------+--------+
我不确定它会大大改进您的 DAG,但至少它使代码更清晰、更易读。
参考:
我在 spark 中有一个操作,应该对数据框中的多个列执行。一般来说,有2种可能来指定这样的操作
- 硬编码
handleBias("bar", df)
.join(handleBias("baz", df), df.columns)
.drop(columnsToDrop: _*).show
- 从列名列表中动态生成它们
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode) {
if (isFirst) {
res = handleBias(col, res)
isFirst = false
} else {
res = handleBias(col, res)
}
}
res.drop(columnsToDrop: _*).show
问题是动态生成的 DAG 是不同的,当使用更多的列时,动态解决方案的运行时间比硬编码操作增加的多得多。
我很好奇如何将动态构造的优雅与快速执行时间结合起来。
这是示例代码的 DAG 比较
对于大约 80 列,这会为硬编码变体生成一个相当不错的图表
当前版本的 spark (2.0.2) 与 DataFrames
和 spark-sql
完成最小示例的代码:
def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
编辑
运行 你的任务 foldleft
生成一个线性 DAG
两者都比我原来的 DAG 好很多,但硬编码的变体对我来说看起来更好。在 spark 中连接 SQL 语句的字符串可以让我动态生成硬编码执行图,但这看起来很丑陋。你看到任何其他选项吗?
编辑 1: 从 handleBias 中删除了一个 window 函数并将其转换为广播连接。
编辑 2: 更改了空值的替换策略。
我有一些建议可以改进您的代码。首先,对于 "handleBias" 函数,我会使用 window 函数和 "withColumn" 调用来完成它,避免连接:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
def handleBias(df: DataFrame, colName: String, target: String = "foo") = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
val result = df
.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
result
}
然后,为了为多列调用它,我建议使用 foldLeft
,这是解决此类问题的 "functional" 方法:
val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")
val columnsToDrop = Seq("baz")
val columnsToCode = Seq("bar", "baz")
val target = "foo"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
.agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) {
(currentDF, colName) => handleBias(currentDF, colName)
}
result.drop(columnsToDrop:_*).show()
+---+--------------------+------------------+--------+------------------+--------+
|foo| bar| pre_baz|pre2_baz| pre_bar|pre2_bar|
+---+--------------------+------------------+--------+------------------+--------+
| 2| noValidFormat| 0.0| 2.0| 0.0| 2.0|
| 1|lastAssumingSameDate|0.3333333333333333| 1.0|0.3333333333333333| 1.0|
| 1| second|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
| 1| first|0.6666666666666666| 1.0|0.3333333333333333| 1.0|
+---+--------------------+------------------+--------+------------------+--------+
我不确定它会大大改进您的 DAG,但至少它使代码更清晰、更易读。
参考: