当连接键是 bucketBy 键的超集时,如何说服 spark 不进行交换?
How can I convince spark not to make an exchange when the join key is a super-set of the bucketBy key?
在测试生产用例时,我创建并保存了(使用 Hive Metastore)这样的 tables:
table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets
table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets
我是运行这样的查询(伪代码)
table1.join(table2, [“key1”, “key2”])
.groupBy(“value2”)
.countUnique(“key1”)
常识告诉我们,这个连接应该简单地通过没有交换的排序合并连接来完成;但是 spark 进行交换然后加入。
即使对于这个特定的用例,由于我需要按 key1 存储的其他一些用例,我也可以用两个键存储。当我使用这样的单个键进行(更简单的)连接时:
table1.join(table2, [“key1”])
它按预期工作(即没有交换的排序合并连接)。
现在我已经对这些 table 进行了优化连接,如果我想像这样进行过滤:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
它恢复到交易所然后加入。
当连接键是 bucketBy 键的超集时,如何说服 spark 不进行交换?
注:
我知道的一个技巧是,如果我将重写为不等式检查,那么 spark 不会洗牌。
(x == y) 也可以表示为 ((x >= y) & ( x <= y))。如果我在最后一个例子中应用两个像这样的过滤器:
.filter(table1.col(“key2”) >= table2.col(“key2”))
.filter(table1.col(“key2”) <= table2.col(“key2”))
它将继续使用没有交换的排序合并连接,但这不是解决方案,这是一个 hack。
根据一些研究和探索,这似乎是最简单的解决方案:
基于此示例:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
而不是使用来自 Spark 的 equalTo (==)
,实现自定义 MyEqualTo
(通过委托给 spark EqualTo
实现很好)似乎解决了这个问题。这样,spark 就不会优化(!)连接,它只会将过滤器拉入 SortMergeJoin。
同理,连接条件也可以这样构成:
(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin
是通过 Join 推送谓词的优化器规则。 ~~
我们可以从优化器规则中排除这条规则。这样我们就不必对用户代码进行任何更改。
要排除,我们可以执行以下操作之一
1. --conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin
.
2. 在spark-defaults.conf.
中添加属性
3.在用户代码中添加set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin
。
再一次,这又是一个黑客。 .
理想情况下,过滤器应该通过连接下推,这样可以减少要连接的行数
更新: .
1.我对下推的看法是错误的。将没有过滤器下推,因为谓词具有来自两个表的列。
2.当 where 子句有一个 "non-equality" 谓词时,为什么 SortMergeJoin(SMJ) 不添加额外的交换?
这是因为 SMJ 只能将 equality-based 谓词视为连接条件 org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys#unapply
的一部分
而负责添加交易所的 EnsureRequirements 看到 SMJ 没有新的加入条件并且输出分布已经满足。
代码:org.apache.spark.sql.execution.exchange.EnsureRequirements#ensureDistributionAndOrdering .
3. 哪个更有效 - 添加一个执行等于或将谓词表示为大于和小于的组合的 UDF? .
为了对此进行评估,我使用
检查了生成的代码
val df = spark.sql(<joinquery>)
df.queryExecution.debug.codegen
一个。 UDF equals - 涉及虚函数调用的附加开销。
b.小于和大于的组合 - 没有虚函数调用。一旦我们找到连接的行(使用 key1),代码就会逐个检查其他谓词。
从上面 3 中的观察来看,使用基于 non-equality 的谓词似乎更有效。
**基于你的伪代码**
table1.join(table2, [“key1”, “key2”])
.groupBy(“值2”)
.countUnique(“key1”)
我想解决方案是
as a first step just join the tables and get the data frame.
df = table1.join(table2, [“key1”, “key2”])
then group by and do distinct counts
df.select(“value2”,“key1”).distinct().groupBy(“value2”,“key1”).count().show()
我遇到了同样的问题。
好像有PR完结了,正好解决了这个问题
(公关)https://github.com/apache/spark/pull/19054
(Jira 票)https://issues.apache.org/jira/browse/SPARK-18067
但我本以为它已经包含在内(我使用的是 Spark 3.0.0,问题仍然存在,而票证已于 2019 年 5 月 21 日解决,比 Spark3 发布早一年多)。
感谢使用不等式运算符的“hack”,感觉不太好,但这是一个简单的解决方法。我也会尝试用 PR 中的解决方案修补我的 spark 版本,但如果我想分享我的代码,这就更少了 sustainable/reproducable。
在测试生产用例时,我创建并保存了(使用 Hive Metastore)这样的 tables:
table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets
table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets
我是运行这样的查询(伪代码)
table1.join(table2, [“key1”, “key2”])
.groupBy(“value2”)
.countUnique(“key1”)
常识告诉我们,这个连接应该简单地通过没有交换的排序合并连接来完成;但是 spark 进行交换然后加入。
即使对于这个特定的用例,由于我需要按 key1 存储的其他一些用例,我也可以用两个键存储。当我使用这样的单个键进行(更简单的)连接时:
table1.join(table2, [“key1”])
它按预期工作(即没有交换的排序合并连接)。
现在我已经对这些 table 进行了优化连接,如果我想像这样进行过滤:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
它恢复到交易所然后加入。
当连接键是 bucketBy 键的超集时,如何说服 spark 不进行交换?
注:
我知道的一个技巧是,如果我将重写为不等式检查,那么 spark 不会洗牌。
(x == y) 也可以表示为 ((x >= y) & ( x <= y))。如果我在最后一个例子中应用两个像这样的过滤器:
.filter(table1.col(“key2”) >= table2.col(“key2”))
.filter(table1.col(“key2”) <= table2.col(“key2”))
它将继续使用没有交换的排序合并连接,但这不是解决方案,这是一个 hack。
根据一些研究和探索,这似乎是最简单的解决方案:
基于此示例:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
而不是使用来自 Spark 的 equalTo (==)
,实现自定义 MyEqualTo
(通过委托给 spark EqualTo
实现很好)似乎解决了这个问题。这样,spark 就不会优化(!)连接,它只会将过滤器拉入 SortMergeJoin。
同理,连接条件也可以这样构成:
(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin
是通过 Join 推送谓词的优化器规则。 ~~
我们可以从优化器规则中排除这条规则。这样我们就不必对用户代码进行任何更改。
要排除,我们可以执行以下操作之一
1. --conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin
.
2. 在spark-defaults.conf.
中添加属性
3.在用户代码中添加set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin
。
再一次,这又是一个黑客。 .
理想情况下,过滤器应该通过连接下推,这样可以减少要连接的行数
更新: .
1.我对下推的看法是错误的。将没有过滤器下推,因为谓词具有来自两个表的列。
2.当 where 子句有一个 "non-equality" 谓词时,为什么 SortMergeJoin(SMJ) 不添加额外的交换?
这是因为 SMJ 只能将 equality-based 谓词视为连接条件 org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys#unapply
而负责添加交易所的 EnsureRequirements 看到 SMJ 没有新的加入条件并且输出分布已经满足。
代码:org.apache.spark.sql.execution.exchange.EnsureRequirements#ensureDistributionAndOrdering .
3. 哪个更有效 - 添加一个执行等于或将谓词表示为大于和小于的组合的 UDF? .
为了对此进行评估,我使用
val df = spark.sql(<joinquery>)
df.queryExecution.debug.codegen
一个。 UDF equals - 涉及虚函数调用的附加开销。
b.小于和大于的组合 - 没有虚函数调用。一旦我们找到连接的行(使用 key1),代码就会逐个检查其他谓词。
从上面 3 中的观察来看,使用基于 non-equality 的谓词似乎更有效。
**基于你的伪代码**
table1.join(table2, [“key1”, “key2”]) .groupBy(“值2”) .countUnique(“key1”)
我想解决方案是
as a first step just join the tables and get the data frame.
df = table1.join(table2, [“key1”, “key2”])
then group by and do distinct counts
df.select(“value2”,“key1”).distinct().groupBy(“value2”,“key1”).count().show()
我遇到了同样的问题。 好像有PR完结了,正好解决了这个问题
(公关)https://github.com/apache/spark/pull/19054
(Jira 票)https://issues.apache.org/jira/browse/SPARK-18067
但我本以为它已经包含在内(我使用的是 Spark 3.0.0,问题仍然存在,而票证已于 2019 年 5 月 21 日解决,比 Spark3 发布早一年多)。
感谢使用不等式运算符的“hack”,感觉不太好,但这是一个简单的解决方法。我也会尝试用 PR 中的解决方案修补我的 spark 版本,但如果我想分享我的代码,这就更少了 sustainable/reproducable。