如何使用单独的键列表来执行两个 DataFrame 之间的连接?
How to use separate key lists to perform a join between two DataFrames?
我想加入两个不同的 DataFrame(dfA
和 dfB
),构建如下:
dfA.show()
+-----+-------+-------+
| id_A| name_A|address|
+-----+-------+-------+
| 1| AAAA| Paris|
| 4| DDDD| Sydney|
+-----+-------+-------+
dfB.show()
+-----+-------+---------+
| id_B| name_B| job|
+-----+-------+---------+
| 1| AAAA| Analyst|
| 2| AERF| Engineer|
| 3| UOPY| Gardener|
| 4| DDDD| Insurer|
+-----+-------+---------+
我需要使用以下列表才能进行连接:
val keyListA = List("id_A", "name_A")
val keyListB = List("id_B", "name_B")
一个简单的解决方案是:
val join = dfA.join(
dfA("id_A") === dfB("id_B") &&
dfA("name_A") === dfB("name_B"),
"left_outer")
是否有允许您使用 keyListA
和 keyListB
列表执行此连接的语法?
如果您真的想从列名列表构建连接表达式:
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._
val dfA: DataFrame = ???
val dfB: DataFrame = ???
val keyListA = List("id_A", "name_A", "property1_A", "property2_A", "property3_A")
val keyListB = List("id_B", "name_B", "property1_B", "property2_B", "property3_B")
def joinExprsFrom(keyListA: List[String], keyListB: List[String]): Column =
keyListA
.zip(keyListB)
.map { case (fromA, fromB) => col(fromA) === col(fromB) }
.reduce((acc, expr) => acc && expr )
dfA.join(
dfB,
joinExprsFrom(keyListA, keyListB),
"left_outer")
您需要确保 keyListA
和 keyListB
大小相同且非空。
我想加入两个不同的 DataFrame(dfA
和 dfB
),构建如下:
dfA.show()
+-----+-------+-------+
| id_A| name_A|address|
+-----+-------+-------+
| 1| AAAA| Paris|
| 4| DDDD| Sydney|
+-----+-------+-------+
dfB.show()
+-----+-------+---------+
| id_B| name_B| job|
+-----+-------+---------+
| 1| AAAA| Analyst|
| 2| AERF| Engineer|
| 3| UOPY| Gardener|
| 4| DDDD| Insurer|
+-----+-------+---------+
我需要使用以下列表才能进行连接:
val keyListA = List("id_A", "name_A")
val keyListB = List("id_B", "name_B")
一个简单的解决方案是:
val join = dfA.join(
dfA("id_A") === dfB("id_B") &&
dfA("name_A") === dfB("name_B"),
"left_outer")
是否有允许您使用 keyListA
和 keyListB
列表执行此连接的语法?
如果您真的想从列名列表构建连接表达式:
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._
val dfA: DataFrame = ???
val dfB: DataFrame = ???
val keyListA = List("id_A", "name_A", "property1_A", "property2_A", "property3_A")
val keyListB = List("id_B", "name_B", "property1_B", "property2_B", "property3_B")
def joinExprsFrom(keyListA: List[String], keyListB: List[String]): Column =
keyListA
.zip(keyListB)
.map { case (fromA, fromB) => col(fromA) === col(fromB) }
.reduce((acc, expr) => acc && expr )
dfA.join(
dfB,
joinExprsFrom(keyListA, keyListB),
"left_outer")
您需要确保 keyListA
和 keyListB
大小相同且非空。