Spark:偷懒行动?

Spark: lazy action?

我正在开发一个复杂的应用程序。我们从源数据中计算出许多统计数据,例如 .

val df1 = sourceData.filter($"col1" === "val" and ...)
     .select(...)
     .groupBy(...)
     .min()

val df2 = sourceData.filter($"col2" === "val" and ...)
     .select(...)
     .groupBy(...)
     .count()

由于数据帧在同一列上分组,因此结果数据帧将分组在一起:

df1.join(df2, Seq("groupCol"), "full_outer")
    .join(df3....) 
    .write.save(...)

(在我的代码中这是循环完成的)

这不是高性能的,问题是每个数据帧(我有大约 30 个)以一个动作结束,所以在我的理解中每个数据帧都被计算并返回给驱动程序,然后驱动程序将数据发送回执行程序以执行加入。

这给了我内存错误,我可以增加驱动程序内存,但我正在寻找更好的方法。对于前。如果仅在最后计算所有数据帧(保存连接的数据帧),我想一切都将由集群管理。

有没有办法做一种偷懒的动作?还是我应该以其他方式加入数据框?

感谢

首先,您显示的代码仅包含一个类似动作的操作 - DataFrameWriter.save。所有其他组件都是惰性的。

但是懒惰在这里并不能真正帮助你。最大的问题(假设没有丑陋的数据倾斜或错误配置的广播)是各个聚合需要单独的洗牌和昂贵的后续合并。

一个天真的解决方案是利用:

the dataframe are grouped on the same columns

先洗牌:

val groupColumns: Seq[Column] = ???

val sourceDataPartitioned =  sourceData.groupBy(groupColumns: _*)

并使用结果计算单个聚合

val df1 = sourceDataPartitioned
  ...

val df2 = sourceDataPartitioned
  ...

但是,这种方法相当脆弱,不太可能在存在大量/倾斜的群体中扩展。

因此,重写代码以仅执行聚合会好得多。幸运的是, 就是您所需要的。

让我们从将代码结构化为三个元素元组开始:

  • _1 是一个谓词(与 filter 一起使用的条件)。
  • _2 是您要为其计算聚合的 Columns 的列表。
  • _3 是聚合函数。

示例结构可以看这个:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{count, min}

val ops: Seq[(Column, Seq[Column], Column => Column)] = Seq(
  ($"col1" === "a" and $"col2" === "b", Seq($"col3", $"col4"), count),
  ($"col2" === "b" and $"col3" === "c", Seq($"col4", $"col5"), min)
)

现在您可以使用

编写聚合表达式
agg_function(when(predicate, column)) 

模式

import org.apache.spark.sql.functions.when

val exprs: Seq[Column] = ops.flatMap {
    case (p, cols, f) => cols.map {
      case c => f(when(p, c))
    }
}

并在 sourceData

上使用它
sourceData.groupBy(groupColumns: _*).agg(exprs.head, exprs.tail: _*)

必要时添加aliases