在分散流程之前缓存在火花中

Caching in spark before diverging the flow

我有一个关于使用 Spark DataFrame 的基本问题。

考虑以下伪代码:

val df1 = // Lazy Read from csv and create dataframe
val df2 = // Filter df1 on some condition
val df3 = // Group by on df2 on certain columns
val df4 = // Join df3 with some other df

val subdf1 = // All records from df4 where id < 0
val subdf2 =  // All records from df4 where id > 0

* Then some more operations on subdf1 and subdf2 which won't trigger spark evaluation yet*

// Write out subdf1
// Write out subdf2

假设我从主数据帧开始 df1(我懒惰地从 CSV 中读取),对该数据帧执行一些操作(过滤、分组、连接),然后是我拆分该数据帧的点基于条件(例如,id > 0 和 id < 0)。然后我进一步对这些子数据帧进行操作(让我们命名这些subdf1, subdf2)并最终写出两个子数据帧。

请注意,write 函数是触发 spark 评估的唯一命令,其余函数(filter、groupby、join)导致延迟评估。

现在当我写出 subdf1 时,我很清楚惰性评估开始了,所有的语句都从读取 CSV 开始评估以创建 df1。

当我们开始写 subdf2 时,我的问题就来了。 spark 是否理解 df4 处的代码差异并在遇到写出 subdf1 的命令时存储此数据帧?或者它会再次从创建 df1 的第一行开始并重新评估所有中间数据帧吗? 如果是这样,cache 数据框 df4(假设我有足够的内存)是个好主意吗?

如果重要的话,我正在使用 scala spark。 任何帮助将不胜感激。

不,Spark 无法从您的代码中推断出这一点。一切都会重新开始。要确认这一点,您可以执行 subdf1.explain()subdf2.explain(),您应该会看到两个数据帧都有从读取 df1 的开头开始的查询计划。

所以你是对的,如果你有足够的内存,你应该缓存 df4 以避免重做从 df1 开始的所有计算。当然,如果您不再需要 df4 进行任何进一步的计算,请记住在最后执行 df4.unpersist() 来取消坚持。