使用 spark-sql GROUP BY 解决性能和内存问题

Working Around Performance & Memory Issues with spark-sql GROUP BY

考虑以下运行一个GROUP BY的例子,聚合数和组数都比较多:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._

val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt

case class Data(A: Long = (math.random*num_groups).toLong)

val table = (1 to num_rows).map(i => Data()).toDF

val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")

// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")

这个作业的输入只有 8MB,输出大约 2.4GB,我 运行 在一个集群上,这个集群有三台工作机器,每台机器有 61GB 内存。结果:所有 worker 都因 OutOfMemory 异常而崩溃。 即使 num_columns 的值较低,由于 GC 开销,作业也会变得异常缓慢。

我们尝试过的事情包括:

有没有更好的方法达到预期的效果?

由于我不确定你使用的是哪种聚合函数,所以很难说spark在后台做了什么。在任何情况下,为了对每个聚合函数有更多的控制,我会 运行 对基本 RDD 本身上的每个函数进行 reduceByKey 转换。然后,如有必要,您可以简单地加入结果。这样你就有了更多的控制权,并且可以看到你最喜欢聚合中的哪一个 "cost",另外你可以避免 group by 操作,除了洗牌之外,它还会导致内存问题(由于整个数据集到一个分区中)。以下是一个简短的说明,其中 aggrigationFunctions 是聚合函数的列表及其 ID 和实际函数(元组列表)。

val aggrigationResults = aggrigationFunctions.map( 
   f => {
     val aggRes = baseRdd
                     .map(x => (x.[the field to group by], x.[the value to aggrigate]))
                     .reduceByKey(f.func)
     (f.id, aggRes)
   }
)

一般来说,解决此类问题的几乎通用解决方案是将分区大小保持在合理的大小。虽然 "reasonable" 有点主观,并且因情况而异,但 100-200MB 看起来是个不错的起点。

我可以轻松汇总您提供的关于单个工作人员的示例数据,保持默认 spark.executor.memory (1GB) 并将总可用资源限制为 8 个内核和 8GB RAM。所有这些都是通过使用 50 个分区并将聚合时间保持在 3 秒左右而无需任何特殊调整(这在 1.5.2 到 2.0.0 之间或多或少是一致的)。

总结一下:如果可能,增加 spark.default.parallelism 或在创建 DataFrame 时明确设置分区数。对于像这样的小型数据集,默认 spark.sql.shuffle.partitions 应该足够了。