Spark 数据集 joinWith API 给出了错误的结果

Spark Dataset joinWith API giving wrong results

这是一个小测试用例,用于重现我在加入代码时遇到的问题

case class B(val b1:String, val b2: Int)
val B1 = new B("One",1)
val B2 = new B("Two",2)
val dsB = spark.createDataset(Seq(B1, B2))
dsB.show()
+---+---+
| b1| b2|
+---+---+
|One|  1|
|Two|  2|
+---+---+
val m = Map(1->"Van")
val mapget = spark.udf.register("mapget",  (b: Int) => m.get(b))
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null")
dsB1.show()
+---+---+
| b1| b2|
+---+---+
|One|Van|
+---+---+
val j = dsB1.joinWith(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+----------+--------+
|        _1|      _2|
+----------+--------+
|[One, Van]|[One, 1]|
|[One, Van]|[Two, 2]|
+----------+--------+

joinWith结果不对。它本质上是在做叉积。任何线索是什么问题?我已经验证 join API 工作正常。

val j = dsB1.join(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+---+---+---+---+
| b1| b2| b1| b2|
+---+---+---+---+
|One|Van|One|  1|
+---+---+---+---+

看起来您使用的是相当旧的 Spark 版本。 在 Spark 2.4.4 上,当 运行 您的示例时出现以下异常:

org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
LocalRelation [_1#55]
and
LocalRelation [_2#56]
Join condition is missing or trivial.

原因是连接条件实际上将 dsB("b1") 与自身进行比较,并且始终为真。

一个简单的解决方案是重命名该列。类似的东西:

val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null").withColumnRenamed("b1", "b1_2")
val j = dsB1.joinWith(dsB, dsB1("b1_2") === dsB("b1"), "inner")
j.show
+----------+--------+
|        _1|      _2|
+----------+--------+
|[One, Van]|[One, 1]|
+----------+--------+