Spark 数据集 joinWith API 给出了错误的结果
Spark Dataset joinWith API giving wrong results
这是一个小测试用例,用于重现我在加入代码时遇到的问题
case class B(val b1:String, val b2: Int)
val B1 = new B("One",1)
val B2 = new B("Two",2)
val dsB = spark.createDataset(Seq(B1, B2))
dsB.show()
+---+---+
| b1| b2|
+---+---+
|One| 1|
|Two| 2|
+---+---+
val m = Map(1->"Van")
val mapget = spark.udf.register("mapget", (b: Int) => m.get(b))
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null")
dsB1.show()
+---+---+
| b1| b2|
+---+---+
|One|Van|
+---+---+
val j = dsB1.joinWith(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+----------+--------+
| _1| _2|
+----------+--------+
|[One, Van]|[One, 1]|
|[One, Van]|[Two, 2]|
+----------+--------+
joinWith
结果不对。它本质上是在做叉积。任何线索是什么问题?我已经验证 join
API 工作正常。
val j = dsB1.join(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+---+---+---+---+
| b1| b2| b1| b2|
+---+---+---+---+
|One|Van|One| 1|
+---+---+---+---+
看起来您使用的是相当旧的 Spark 版本。
在 Spark 2.4.4 上,当 运行 您的示例时出现以下异常:
org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
LocalRelation [_1#55]
and
LocalRelation [_2#56]
Join condition is missing or trivial.
原因是连接条件实际上将 dsB("b1")
与自身进行比较,并且始终为真。
一个简单的解决方案是重命名该列。类似的东西:
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null").withColumnRenamed("b1", "b1_2")
val j = dsB1.joinWith(dsB, dsB1("b1_2") === dsB("b1"), "inner")
j.show
+----------+--------+
| _1| _2|
+----------+--------+
|[One, Van]|[One, 1]|
+----------+--------+
这是一个小测试用例,用于重现我在加入代码时遇到的问题
case class B(val b1:String, val b2: Int)
val B1 = new B("One",1)
val B2 = new B("Two",2)
val dsB = spark.createDataset(Seq(B1, B2))
dsB.show()
+---+---+
| b1| b2|
+---+---+
|One| 1|
|Two| 2|
+---+---+
val m = Map(1->"Van")
val mapget = spark.udf.register("mapget", (b: Int) => m.get(b))
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null")
dsB1.show()
+---+---+
| b1| b2|
+---+---+
|One|Van|
+---+---+
val j = dsB1.joinWith(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+----------+--------+
| _1| _2|
+----------+--------+
|[One, Van]|[One, 1]|
|[One, Van]|[Two, 2]|
+----------+--------+
joinWith
结果不对。它本质上是在做叉积。任何线索是什么问题?我已经验证 join
API 工作正常。
val j = dsB1.join(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+---+---+---+---+
| b1| b2| b1| b2|
+---+---+---+---+
|One|Van|One| 1|
+---+---+---+---+
看起来您使用的是相当旧的 Spark 版本。 在 Spark 2.4.4 上,当 运行 您的示例时出现以下异常:
org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
LocalRelation [_1#55]
and
LocalRelation [_2#56]
Join condition is missing or trivial.
原因是连接条件实际上将 dsB("b1")
与自身进行比较,并且始终为真。
一个简单的解决方案是重命名该列。类似的东西:
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null").withColumnRenamed("b1", "b1_2")
val j = dsB1.joinWith(dsB, dsB1("b1_2") === dsB("b1"), "inner")
j.show
+----------+--------+
| _1| _2|
+----------+--------+
|[One, Van]|[One, 1]|
+----------+--------+