Spark:偷懒行动?
Spark: lazy action?
我正在开发一个复杂的应用程序。我们从源数据中计算出许多统计数据,例如 .
val df1 = sourceData.filter($"col1" === "val" and ...)
.select(...)
.groupBy(...)
.min()
val df2 = sourceData.filter($"col2" === "val" and ...)
.select(...)
.groupBy(...)
.count()
由于数据帧在同一列上分组,因此结果数据帧将分组在一起:
df1.join(df2, Seq("groupCol"), "full_outer")
.join(df3....)
.write.save(...)
(在我的代码中这是循环完成的)
这不是高性能的,问题是每个数据帧(我有大约 30 个)以一个动作结束,所以在我的理解中每个数据帧都被计算并返回给驱动程序,然后驱动程序将数据发送回执行程序以执行加入。
这给了我内存错误,我可以增加驱动程序内存,但我正在寻找更好的方法。对于前。如果仅在最后计算所有数据帧(保存连接的数据帧),我想一切都将由集群管理。
有没有办法做一种偷懒的动作?还是我应该以其他方式加入数据框?
感谢
首先,您显示的代码仅包含一个类似动作的操作 - DataFrameWriter.save
。所有其他组件都是惰性的。
但是懒惰在这里并不能真正帮助你。最大的问题(假设没有丑陋的数据倾斜或错误配置的广播)是各个聚合需要单独的洗牌和昂贵的后续合并。
一个天真的解决方案是利用:
the dataframe are grouped on the same columns
先洗牌:
val groupColumns: Seq[Column] = ???
val sourceDataPartitioned = sourceData.groupBy(groupColumns: _*)
并使用结果计算单个聚合
val df1 = sourceDataPartitioned
...
val df2 = sourceDataPartitioned
...
但是,这种方法相当脆弱,不太可能在存在大量/倾斜的群体中扩展。
因此,重写代码以仅执行聚合会好得多。幸运的是, 就是您所需要的。
让我们从将代码结构化为三个元素元组开始:
_1
是一个谓词(与 filter
一起使用的条件)。
_2
是您要为其计算聚合的 Columns
的列表。
_3
是聚合函数。
示例结构可以看这个:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{count, min}
val ops: Seq[(Column, Seq[Column], Column => Column)] = Seq(
($"col1" === "a" and $"col2" === "b", Seq($"col3", $"col4"), count),
($"col2" === "b" and $"col3" === "c", Seq($"col4", $"col5"), min)
)
现在您可以使用
编写聚合表达式
agg_function(when(predicate, column))
模式
import org.apache.spark.sql.functions.when
val exprs: Seq[Column] = ops.flatMap {
case (p, cols, f) => cols.map {
case c => f(when(p, c))
}
}
并在 sourceData
上使用它
sourceData.groupBy(groupColumns: _*).agg(exprs.head, exprs.tail: _*)
必要时添加aliases
。
我正在开发一个复杂的应用程序。我们从源数据中计算出许多统计数据,例如 .
val df1 = sourceData.filter($"col1" === "val" and ...)
.select(...)
.groupBy(...)
.min()
val df2 = sourceData.filter($"col2" === "val" and ...)
.select(...)
.groupBy(...)
.count()
由于数据帧在同一列上分组,因此结果数据帧将分组在一起:
df1.join(df2, Seq("groupCol"), "full_outer")
.join(df3....)
.write.save(...)
(在我的代码中这是循环完成的)
这不是高性能的,问题是每个数据帧(我有大约 30 个)以一个动作结束,所以在我的理解中每个数据帧都被计算并返回给驱动程序,然后驱动程序将数据发送回执行程序以执行加入。
这给了我内存错误,我可以增加驱动程序内存,但我正在寻找更好的方法。对于前。如果仅在最后计算所有数据帧(保存连接的数据帧),我想一切都将由集群管理。
有没有办法做一种偷懒的动作?还是我应该以其他方式加入数据框?
感谢
首先,您显示的代码仅包含一个类似动作的操作 - DataFrameWriter.save
。所有其他组件都是惰性的。
但是懒惰在这里并不能真正帮助你。最大的问题(假设没有丑陋的数据倾斜或错误配置的广播)是各个聚合需要单独的洗牌和昂贵的后续合并。
一个天真的解决方案是利用:
the dataframe are grouped on the same columns
先洗牌:
val groupColumns: Seq[Column] = ???
val sourceDataPartitioned = sourceData.groupBy(groupColumns: _*)
并使用结果计算单个聚合
val df1 = sourceDataPartitioned
...
val df2 = sourceDataPartitioned
...
但是,这种方法相当脆弱,不太可能在存在大量/倾斜的群体中扩展。
因此,重写代码以仅执行聚合会好得多。幸运的是,
让我们从将代码结构化为三个元素元组开始:
_1
是一个谓词(与filter
一起使用的条件)。_2
是您要为其计算聚合的Columns
的列表。_3
是聚合函数。
示例结构可以看这个:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{count, min}
val ops: Seq[(Column, Seq[Column], Column => Column)] = Seq(
($"col1" === "a" and $"col2" === "b", Seq($"col3", $"col4"), count),
($"col2" === "b" and $"col3" === "c", Seq($"col4", $"col5"), min)
)
现在您可以使用
编写聚合表达式agg_function(when(predicate, column))
模式
import org.apache.spark.sql.functions.when
val exprs: Seq[Column] = ops.flatMap {
case (p, cols, f) => cols.map {
case c => f(when(p, c))
}
}
并在 sourceData
sourceData.groupBy(groupColumns: _*).agg(exprs.head, exprs.tail: _*)
必要时添加aliases
。