Apache Spark Dataframe 如何在使用 groupBy 时关闭部分聚合?
Apache Spark Dataframe How to turn off partial aggregation when using groupBy?
在 Spark 3.1.1 中,我在 DataFrame 上做了一个没有 distinct 的 groupBy。我尝试使用
关闭部分聚合
spark.conf.set("spark.sql.aggregate.partialaggregate.skip.enabled", "true")
然后运行查询
df.groupBy("method").agg(sum("request_body_len"))
Spark 仍然会按照物理计划中所示进行部分聚合。
== Physical Plan ==
*(2) HashAggregate(keys=[method#23], functions=[sum(cast(request_body_len#28 as bigint))], output=[method#23, sum(request_body_len)#142L])
+- Exchange hashpartitioning(method#23, 200), ENSURE_REQUIREMENTS, [id=#58]
+- *(1) HashAggregate(keys=[method#23], functions=[partial_sum(cast(request_body_len#28 as bigint))], output=[method#23, sum#146L])
+- FileScan csv [method#23,request_body_len#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/mnt/http.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<method:string,request_body_len:int>
我在 Youtube 上观看了这个视频后尝试了这个:https://youtu.be/_Ne27JcLnEc @56:53
这个功能在最新的 Spark 中不再可用还是我遗漏了什么?
配置 spark.sql.aggregate.partialaggregate.skip.enabled
在 spark 源代码中不存在。
所有 spark.sql.*
配置都在 SQLConf 对象中定义(而这个不存在):
https://github.com/apache/spark/blob/651904a2ef57d2261ea6e256e4f3cdd26aa1b69d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
据我检查,此配置在以前的版本中也不可用。
在您分享的视频中,相关幻灯片中有一个 link PR 可以跳过部分聚合,但它似乎没有合并。
有一个 spark 项目的 PR 解决了这个问题并添加了这个配置(也许这与视频中出现的 PR 相同,我不知道)但它被关闭了:
https://github.com/apache/spark/pull/28804
在 Spark 3.1.1 中,我在 DataFrame 上做了一个没有 distinct 的 groupBy。我尝试使用
关闭部分聚合spark.conf.set("spark.sql.aggregate.partialaggregate.skip.enabled", "true")
然后运行查询
df.groupBy("method").agg(sum("request_body_len"))
Spark 仍然会按照物理计划中所示进行部分聚合。
== Physical Plan ==
*(2) HashAggregate(keys=[method#23], functions=[sum(cast(request_body_len#28 as bigint))], output=[method#23, sum(request_body_len)#142L])
+- Exchange hashpartitioning(method#23, 200), ENSURE_REQUIREMENTS, [id=#58]
+- *(1) HashAggregate(keys=[method#23], functions=[partial_sum(cast(request_body_len#28 as bigint))], output=[method#23, sum#146L])
+- FileScan csv [method#23,request_body_len#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/mnt/http.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<method:string,request_body_len:int>
我在 Youtube 上观看了这个视频后尝试了这个:https://youtu.be/_Ne27JcLnEc @56:53
这个功能在最新的 Spark 中不再可用还是我遗漏了什么?
配置 spark.sql.aggregate.partialaggregate.skip.enabled
在 spark 源代码中不存在。
所有 spark.sql.*
配置都在 SQLConf 对象中定义(而这个不存在):
https://github.com/apache/spark/blob/651904a2ef57d2261ea6e256e4f3cdd26aa1b69d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
据我检查,此配置在以前的版本中也不可用。
在您分享的视频中,相关幻灯片中有一个 link PR 可以跳过部分聚合,但它似乎没有合并。
有一个 spark 项目的 PR 解决了这个问题并添加了这个配置(也许这与视频中出现的 PR 相同,我不知道)但它被关闭了: https://github.com/apache/spark/pull/28804