当连接键是 bucketBy 键的超集时,如何说服 spark 不进行交换?

How can I convince spark not to make an exchange when the join key is a super-set of the bucketBy key?

在测试生产用例时,我创建并保存了(使用 Hive Metastore)这样的 tables:

table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets

table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets

我是运行这样的查询(伪代码)

table1.join(table2, [“key1”, “key2”])
 .groupBy(“value2”)
 .countUnique(“key1”)

常识告诉我们,这个连接应该简单地通过没有交换的排序合并连接来完成;但是 spark 进行交换然后加入。

即使对于这个特定的用例,由于我需要按 key1 存储的其他一些用例,我也可以用两个键存储。当我使用这样的单个键进行(更简单的)连接时:

table1.join(table2, [“key1”])

它按预期工作(即没有交换的排序合并连接)。

现在我已经对这些 table 进行了优化连接,如果我想像这样进行过滤:

table1.join(table2, [“key1”])
 .filter(table1.col(“key2”) == table2.col(“key2”))

它恢复到交易所然后加入。

当连接键是 bucketBy 键的超集时,如何说服 spark 不进行交换?

注:

我知道的一个技巧是,如果我将重写为不等式检查,那么 spark 不会洗牌。

(x == y) 也可以表示为 ((x >= y) & ( x <= y))。如果我在最后一个例子中应用两个像这样的过滤器:

.filter(table1.col(“key2”) >= table2.col(“key2”))

.filter(table1.col(“key2”) <= table2.col(“key2”))

它将继续使用没有交换的排序合并连接,但这不是解决方案,这是一个 hack。

根据一些研究和探索,这似乎是最简单的解决方案:

基于此示例:

table1.join(table2, [“key1”])
      .filter(table1.col(“key2”) == table2.col(“key2”))

而不是使用来自 Spark 的 equalTo (==),实现自定义 MyEqualTo(通过委托给 spark EqualTo 实现很好)似乎解决了这个问题。这样,spark 就不会优化(!)连接,它只会将过滤器拉入 SortMergeJoin。

同理,连接条件也可以这样构成:

(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))

org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin 是通过 Join 推送谓词的优化器规则。 ~~
我们可以从优化器规则中排除这条规则。这样我们就不必对用户代码进行任何更改。
要排除,我们可以执行以下操作之一
1. --conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin.
2. 在spark-defaults.conf.
中添加属性 3.在用户代码中添加set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin

再一次,这又是一个黑客。 .
理想情况下,过滤器应该通过连接下推,这样可以减少要连接的行数

更新: .
1.我对下推的看法是错误的。将没有过滤器下推,因为谓词具有来自两个表的列。
2.where 子句有一个 "non-equality" 谓词时,为什么 SortMergeJoin(SMJ) 不添加额外的交换?
这是因为 SMJ 只能将 equality-based 谓词视为连接条件 org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys#unapply

的一部分

而负责添加交易所的 EnsureRequirements 看到 SMJ 没有新的加入条件并且输出分布已经满足。
代码:org.apache.spark.sql.execution.exchange.EnsureRequirements#ensureDistributionAndOrdering .
3. 哪个更有效 - 添加一个执行等于或将谓词表示为大于和小于的组合的 UDF? .
为了对此进行评估,我使用

检查了生成的代码
val df = spark.sql(<joinquery>)
df.queryExecution.debug.codegen

一个。 UDF equals - 涉及虚函数调用的附加开销。
b.小于和大于的组合 - 没有虚函数调用。一旦我们找到连接的行(使用 key1),代码就会逐个检查其他谓词。

从上面 3 中的观察来看,使用基于 non-equality 的谓词似乎更有效。

**基于你的伪代码**

table1.join(table2, [“key1”, “key2”]) .groupBy(“值2”) .countUnique(“key1”)

我想解决方案是

as a first step just join the tables and get the data frame.

df = table1.join(table2, [“key1”, “key2”])

then group by and do distinct counts

df.select(“value2”,“key1”).distinct().groupBy(“value2”,“key1”).count().show()

我遇到了同样的问题。 好像有PR完结了,正好解决了这个问题

(公关)https://github.com/apache/spark/pull/19054

(Jira 票)https://issues.apache.org/jira/browse/SPARK-18067

但我本以为它已经包含在内(我使用的是 Spark 3.0.0,问题仍然存在,而票证已于 2019 年 5 月 21 日解决,比 Spark3 发布早一年多)。

感谢使用不等式运算符的“hack”,感觉不太好,但这是一个简单的解决方法。我也会尝试用 PR 中的解决方案修补我的 spark 版本,但如果我想分享我的代码,这就更少了 sustainable/reproducable。