使用 spark-sql GROUP BY 解决性能和内存问题
Working Around Performance & Memory Issues with spark-sql GROUP BY
考虑以下运行一个GROUP BY
的例子,聚合数和组数都比较多:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._
val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt
case class Data(A: Long = (math.random*num_groups).toLong)
val table = (1 to num_rows).map(i => Data()).toDF
val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")
// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")
这个作业的输入只有 8MB,输出大约 2.4GB,我 运行 在一个集群上,这个集群有三台工作机器,每台机器有 61GB 内存。结果:所有 worker 都因 OutOfMemory 异常而崩溃。
即使 num_columns
的值较低,由于 GC 开销,作业也会变得异常缓慢。
我们尝试过的事情包括:
- 减小分区大小(减少内存占用但增加簿记开销)
- 在进行聚合之前使用 HashPartitioner 对数据进行预分区(减少内存消耗,但在任何实际工作发生之前需要完全重新洗牌)
有没有更好的方法达到预期的效果?
由于我不确定你使用的是哪种聚合函数,所以很难说spark在后台做了什么。在任何情况下,为了对每个聚合函数有更多的控制,我会 运行 对基本 RDD 本身上的每个函数进行 reduceByKey 转换。然后,如有必要,您可以简单地加入结果。这样你就有了更多的控制权,并且可以看到你最喜欢聚合中的哪一个 "cost",另外你可以避免 group by 操作,除了洗牌之外,它还会导致内存问题(由于整个数据集到一个分区中)。以下是一个简短的说明,其中 aggrigationFunctions 是聚合函数的列表及其 ID 和实际函数(元组列表)。
val aggrigationResults = aggrigationFunctions.map(
f => {
val aggRes = baseRdd
.map(x => (x.[the field to group by], x.[the value to aggrigate]))
.reduceByKey(f.func)
(f.id, aggRes)
}
)
一般来说,解决此类问题的几乎通用解决方案是将分区大小保持在合理的大小。虽然 "reasonable" 有点主观,并且因情况而异,但 100-200MB 看起来是个不错的起点。
我可以轻松汇总您提供的关于单个工作人员的示例数据,保持默认 spark.executor.memory
(1GB) 并将总可用资源限制为 8 个内核和 8GB RAM。所有这些都是通过使用 50 个分区并将聚合时间保持在 3 秒左右而无需任何特殊调整(这在 1.5.2 到 2.0.0 之间或多或少是一致的)。
总结一下:如果可能,增加 spark.default.parallelism
或在创建 DataFrame
时明确设置分区数。对于像这样的小型数据集,默认 spark.sql.shuffle.partitions
应该足够了。
考虑以下运行一个GROUP BY
的例子,聚合数和组数都比较多:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._
val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt
case class Data(A: Long = (math.random*num_groups).toLong)
val table = (1 to num_rows).map(i => Data()).toDF
val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")
// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")
这个作业的输入只有 8MB,输出大约 2.4GB,我 运行 在一个集群上,这个集群有三台工作机器,每台机器有 61GB 内存。结果:所有 worker 都因 OutOfMemory 异常而崩溃。
即使 num_columns
的值较低,由于 GC 开销,作业也会变得异常缓慢。
我们尝试过的事情包括:
- 减小分区大小(减少内存占用但增加簿记开销)
- 在进行聚合之前使用 HashPartitioner 对数据进行预分区(减少内存消耗,但在任何实际工作发生之前需要完全重新洗牌)
有没有更好的方法达到预期的效果?
由于我不确定你使用的是哪种聚合函数,所以很难说spark在后台做了什么。在任何情况下,为了对每个聚合函数有更多的控制,我会 运行 对基本 RDD 本身上的每个函数进行 reduceByKey 转换。然后,如有必要,您可以简单地加入结果。这样你就有了更多的控制权,并且可以看到你最喜欢聚合中的哪一个 "cost",另外你可以避免 group by 操作,除了洗牌之外,它还会导致内存问题(由于整个数据集到一个分区中)。以下是一个简短的说明,其中 aggrigationFunctions 是聚合函数的列表及其 ID 和实际函数(元组列表)。
val aggrigationResults = aggrigationFunctions.map(
f => {
val aggRes = baseRdd
.map(x => (x.[the field to group by], x.[the value to aggrigate]))
.reduceByKey(f.func)
(f.id, aggRes)
}
)
一般来说,解决此类问题的几乎通用解决方案是将分区大小保持在合理的大小。虽然 "reasonable" 有点主观,并且因情况而异,但 100-200MB 看起来是个不错的起点。
我可以轻松汇总您提供的关于单个工作人员的示例数据,保持默认 spark.executor.memory
(1GB) 并将总可用资源限制为 8 个内核和 8GB RAM。所有这些都是通过使用 50 个分区并将聚合时间保持在 3 秒左右而无需任何特殊调整(这在 1.5.2 到 2.0.0 之间或多或少是一致的)。
总结一下:如果可能,增加 spark.default.parallelism
或在创建 DataFrame
时明确设置分区数。对于像这样的小型数据集,默认 spark.sql.shuffle.partitions
应该足够了。