Scala/Spark 如果其中一个数据帧中不存在第一个键,则加入第二个键
Scala/Spark join on a second key if the first key doesn't exist in one of the dataframes
我有两个数据框:
RegionValues:
+-----------+----------+----------------------+
|marketplace|primary_id|values |
+-----------+----------+----------------------+
|xyz |0000000001|[cat, dog, cow] |
|reg |PRT0000001|[hippo, dragon, moose]|
|asz |0000001333|[mouse, rhino, lion] |
+-----------+----------+----------------------+
Marketplace:
+----------+-----------+----------+
|primary_id|marketplace|parent_id |
+----------+-----------+----------+
|0000000001|xyz |PRT0000001|
|0000000002|wrt |PRT0000001|
|PRT0000001|reg |PRT0000001|
|PRT00MISS0|asz |PRT00MISS0|
|000000000B|823 |PRT0000002|
+----------+-----------+----------+
当我将数据帧连接在一起时,我想根据 primary_id
值连接它们,但是如果 primary_id
字段不存在于 RegionValues
数据帧中,那么我想回退到加入 parent_id
=== primary_id
。所以我想要的输出是:
+----------+--------------+-----------+-------------------------------------+
|primary_id|marketplace |parent_id |values |
+----------+--------------+-----------+-------------------------------------+
|0000000001|... |PRT0000001 |[cat, dog, cow] |
|0000000002|... |PRT0000001 |[hippo, dragon, moose] |
|PRT0000001|... |PRT0000001 |[hippo, dragon, moose] |
|PRT00MISS0| |PRT00MISS0 |null |
|0000001333| |0000001333 |[mouse, rhino, lion] |
|000000000B| |PRT0000002 |null |
+----------+--------------+-----------+-------------------------------------+
请注意,0000000001
保持其原始 values
,但 0000000002
采用其 parent_id 的 values
,因为它不存在于 [=16] =].是否可以在 join 语句中完成此逻辑?我正在使用 Scala 和 Spark。
我曾尝试使用这样的连接语句,但这会导致 0000000002
值的空值:
val parentIdJoinCondition = when(
(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull,
marketplaceDf.col("parent_id") === regionValuesDf.col("primary_id")
).otherwise(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id"))
val joinedDf = regionDf.join(
marketplaceDf,
parentIdJoinCondition,
"outer"
)
我想我可以通过使用 3 个不同的连接得到我想要的结果,但这似乎没有必要并且更难阅读。
不应该在您的连接语句中使用 regionValuesDf.col("primary_id") =!= marketplaceDf.col("primary_id"))
而不是 regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull
有帮助吗?
创建自定义条件将导致 Spark 执行交叉连接,这是一种非常低效的连接方式。此外,在执行实际连接之前,Spark 无法知道列不匹配,因此您的条件 regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull
将始终 return false.
因此,正如您猜对的那样,最好的解决方案是执行多个联接。您可以以两个连接结束。首先join判断外连接应该使用primary_id
还是parent_id
值,以及实际的外连接。然后,您可以合并 primary_id
、marketplace
和 parent_id
并删除无用的列
所以代码是:
import org.apache.spark.sql.functions.{coalesce, col, when}
val joinedDf = marketplaceDf.join(regionDf.drop("marketPlace"), Seq("primary_id"), "left")
.withColumn("join_key", when(col("values").isNotNull, col("primary_id")).otherwise(col("parent_id")))
.drop("values")
.join(
regionDf
.withColumnRenamed("primary_id", "join_key")
.withColumnRenamed("marketplace", "region_marketplace"),
Seq("join_key"),
"outer"
)
.withColumn("primary_id", coalesce(col("primary_id"), col("join_key")))
.withColumn("parent_id", coalesce(col("parent_id"), col("join_key")))
.withColumn("marketplace", coalesce(col("marketplace"), col("region_marketplace")))
.drop("join_key", "region_marketplace")
这将为您提供以下 joinDf
数据框:
+----------+-----------+----------+----------------------+
|primary_id|marketplace|parent_id |values |
+----------+-----------+----------+----------------------+
|0000000001|xyz |PRT0000001|[cat, dog, cow] |
|0000001333|asz |0000001333|[mouse, rhino, lion] |
|0000000002|wrt |PRT0000001|[hippo, dragon, moose]|
|PRT0000001|reg |PRT0000001|[hippo, dragon, moose]|
|000000000B|823 |PRT0000002|null |
|PRT00MISS0|asz |PRT00MISS0|null |
+----------+-----------+----------+----------------------+
我有两个数据框:
RegionValues:
+-----------+----------+----------------------+
|marketplace|primary_id|values |
+-----------+----------+----------------------+
|xyz |0000000001|[cat, dog, cow] |
|reg |PRT0000001|[hippo, dragon, moose]|
|asz |0000001333|[mouse, rhino, lion] |
+-----------+----------+----------------------+
Marketplace:
+----------+-----------+----------+
|primary_id|marketplace|parent_id |
+----------+-----------+----------+
|0000000001|xyz |PRT0000001|
|0000000002|wrt |PRT0000001|
|PRT0000001|reg |PRT0000001|
|PRT00MISS0|asz |PRT00MISS0|
|000000000B|823 |PRT0000002|
+----------+-----------+----------+
当我将数据帧连接在一起时,我想根据 primary_id
值连接它们,但是如果 primary_id
字段不存在于 RegionValues
数据帧中,那么我想回退到加入 parent_id
=== primary_id
。所以我想要的输出是:
+----------+--------------+-----------+-------------------------------------+
|primary_id|marketplace |parent_id |values |
+----------+--------------+-----------+-------------------------------------+
|0000000001|... |PRT0000001 |[cat, dog, cow] |
|0000000002|... |PRT0000001 |[hippo, dragon, moose] |
|PRT0000001|... |PRT0000001 |[hippo, dragon, moose] |
|PRT00MISS0| |PRT00MISS0 |null |
|0000001333| |0000001333 |[mouse, rhino, lion] |
|000000000B| |PRT0000002 |null |
+----------+--------------+-----------+-------------------------------------+
请注意,0000000001
保持其原始 values
,但 0000000002
采用其 parent_id 的 values
,因为它不存在于 [=16] =].是否可以在 join 语句中完成此逻辑?我正在使用 Scala 和 Spark。
我曾尝试使用这样的连接语句,但这会导致 0000000002
值的空值:
val parentIdJoinCondition = when(
(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull,
marketplaceDf.col("parent_id") === regionValuesDf.col("primary_id")
).otherwise(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id"))
val joinedDf = regionDf.join(
marketplaceDf,
parentIdJoinCondition,
"outer"
)
我想我可以通过使用 3 个不同的连接得到我想要的结果,但这似乎没有必要并且更难阅读。
不应该在您的连接语句中使用 regionValuesDf.col("primary_id") =!= marketplaceDf.col("primary_id"))
而不是 regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull
有帮助吗?
创建自定义条件将导致 Spark 执行交叉连接,这是一种非常低效的连接方式。此外,在执行实际连接之前,Spark 无法知道列不匹配,因此您的条件 regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull
将始终 return false.
因此,正如您猜对的那样,最好的解决方案是执行多个联接。您可以以两个连接结束。首先join判断外连接应该使用primary_id
还是parent_id
值,以及实际的外连接。然后,您可以合并 primary_id
、marketplace
和 parent_id
并删除无用的列
所以代码是:
import org.apache.spark.sql.functions.{coalesce, col, when}
val joinedDf = marketplaceDf.join(regionDf.drop("marketPlace"), Seq("primary_id"), "left")
.withColumn("join_key", when(col("values").isNotNull, col("primary_id")).otherwise(col("parent_id")))
.drop("values")
.join(
regionDf
.withColumnRenamed("primary_id", "join_key")
.withColumnRenamed("marketplace", "region_marketplace"),
Seq("join_key"),
"outer"
)
.withColumn("primary_id", coalesce(col("primary_id"), col("join_key")))
.withColumn("parent_id", coalesce(col("parent_id"), col("join_key")))
.withColumn("marketplace", coalesce(col("marketplace"), col("region_marketplace")))
.drop("join_key", "region_marketplace")
这将为您提供以下 joinDf
数据框:
+----------+-----------+----------+----------------------+
|primary_id|marketplace|parent_id |values |
+----------+-----------+----------+----------------------+
|0000000001|xyz |PRT0000001|[cat, dog, cow] |
|0000001333|asz |0000001333|[mouse, rhino, lion] |
|0000000002|wrt |PRT0000001|[hippo, dragon, moose]|
|PRT0000001|reg |PRT0000001|[hippo, dragon, moose]|
|000000000B|823 |PRT0000002|null |
|PRT00MISS0|asz |PRT00MISS0|null |
+----------+-----------+----------+----------------------+