如何在减少之前避免大的中间结果?

How to avoid large intermediate result before reduce?

我在 spark 作业中遇到了一个令我吃惊的错误:

 Total size of serialized results of 102 tasks (1029.6 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)

我的工作是这样的:

def add(a,b): return a+b
sums = rdd.mapPartitions(func).reduce(add)

rdd 有约 500 个分区,func 获取该分区中的行,returns 是一个大数组(一个 1.3M 双精度数组,或 ~10Mb)。 我想总结所有这些结果和 return 它们的总和。

Spark 似乎将 mapPartitions(func) 的总结果保存在内存中(大约 5gb),而不是增量处理它,这只需要大约 30Mb。

不是增加 spark.driver.maxResultSize,有没有一种方法可以更增量地执行减少?


更新:实际上,我有点惊讶内存中保存了两个以上的结果。

当使用 reduce 时,Spark 对驱动程序应用最终归约。如果 func returns 单个对象,这实际上等同于:

reduce(add, rdd.collect())

您可以使用 treeReduce:

import math

# Keep maximum possible depth
rdd.treeReduce(add, depth=math.log2(rdd.getNumPartitions()))

toLocalIterator:

sum(rdd.toLocalIterator())

前者将以增加网络交换为代价递归地合并工作节点上的分区。您可以使用 depth 参数调整性能。

后者当时只会收集一个分区,但可能需要重新评估 rdd 并且大部分工作将由驱动程序执行。

根据 func 中使用的确切逻辑,您还可以通过将矩阵拆分为块并按块执行加法来改善工作分配,例如使用 BlockMatrices