Spark 为数据框连接指定多列条件
Spark specify multiple column conditions for dataframe join
如何在连接两个数据框时提供更多的列条件。例如,我想要 运行 以下内容:
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
我只想在这些列匹配时加入。但是上面的语法是无效的,因为 cols 只接受一个字符串。那么我要怎样才能得到我想要的。
你可以做的一件事是使用原始 SQL:
case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)
val bar = sqlContext.createDataFrame(sc.parallelize(
Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
Bar(3, 1, 2, "bar") :: Nil))
val foo = sqlContext.createDataFrame(sc.parallelize(
Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))
foo.registerTempTable("foo")
bar.registerTempTable("bar")
sqlContext.sql(
"SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
这种情况有一个 Spark column/expression API join:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
示例中的 <=>
运算符表示“Equality test that is safe for null values”。
与简单 Equality test (===
) 的主要区别是第一个可以安全使用,以防其中一列可能有空值。
从 Spark 版本 1.5.0(当前未发布)开始,您可以连接多个 DataFrame 列。参考SPARK-7990: Add methods to facilitate equi-join on multiple join keys.
Python
Leads.join(
Utm_Master,
["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
"left_outer"
)
Scala
问题要求 Scala 回答,但我不使用 Scala。这是我最好的猜测....
Leads.join(
Utm_Master,
Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left_outer"
)
Scala:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
不区分大小写,
import org.apache.spark.sql.functions.{lower, upper}
然后在join方法的条件中使用lower(value)
即可。
例如:dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))
在 Pyspark 中,您可以简单地分别指定每个条件:
val Lead_all = Leads.join(Utm_Master,
(Leaddetails.LeadSource == Utm_Master.LeadSource) &
(Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
(Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
(Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))
请务必正确使用运算符和括号。
Spark SQL 支持在括号中连接列的元组,例如
... WHERE (list_of_columns1) = (list_of_columns2)
这比为由一组 "AND" 组合的每对列指定相等表达式 (=) 更短。
例如:
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
)
而不是
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
)
这也不太可读,尤其是当列列表很大并且您想轻松处理 NULL 时。
===
选项为我提供了重复的列。所以我改用 Seq
。
val Lead_all = Leads.join(Utm_Master,
Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")
当然,这只有在连接列的名称相同时才有效。
试试这个:
val rccJoin=dfRccDeuda.as("dfdeuda")
.join(dfRccCliente.as("dfcliente")
,col("dfdeuda.etarcid")===col("dfcliente.etarcid")
&& col("dfdeuda.etarcid")===col("dfcliente.etarcid"),"inner")
在 Pyspark 中,在每个条件周围使用括号是在连接条件中使用多个列名的关键。
joined_df = df1.join(df2,
(df1['name'] == df2['name']) &
(df1['phone'] == df2['phone'])
)
如何在连接两个数据框时提供更多的列条件。例如,我想要 运行 以下内容:
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
我只想在这些列匹配时加入。但是上面的语法是无效的,因为 cols 只接受一个字符串。那么我要怎样才能得到我想要的。
你可以做的一件事是使用原始 SQL:
case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)
val bar = sqlContext.createDataFrame(sc.parallelize(
Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
Bar(3, 1, 2, "bar") :: Nil))
val foo = sqlContext.createDataFrame(sc.parallelize(
Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))
foo.registerTempTable("foo")
bar.registerTempTable("bar")
sqlContext.sql(
"SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
这种情况有一个 Spark column/expression API join:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
示例中的 <=>
运算符表示“Equality test that is safe for null values”。
与简单 Equality test (===
) 的主要区别是第一个可以安全使用,以防其中一列可能有空值。
从 Spark 版本 1.5.0(当前未发布)开始,您可以连接多个 DataFrame 列。参考SPARK-7990: Add methods to facilitate equi-join on multiple join keys.
Python
Leads.join(
Utm_Master,
["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
"left_outer"
)
Scala
问题要求 Scala 回答,但我不使用 Scala。这是我最好的猜测....
Leads.join(
Utm_Master,
Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left_outer"
)
Scala:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
不区分大小写,
import org.apache.spark.sql.functions.{lower, upper}
然后在join方法的条件中使用lower(value)
即可。
例如:dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))
在 Pyspark 中,您可以简单地分别指定每个条件:
val Lead_all = Leads.join(Utm_Master,
(Leaddetails.LeadSource == Utm_Master.LeadSource) &
(Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
(Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
(Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))
请务必正确使用运算符和括号。
Spark SQL 支持在括号中连接列的元组,例如
... WHERE (list_of_columns1) = (list_of_columns2)
这比为由一组 "AND" 组合的每对列指定相等表达式 (=) 更短。
例如:
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
)
而不是
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
)
这也不太可读,尤其是当列列表很大并且您想轻松处理 NULL 时。
===
选项为我提供了重复的列。所以我改用 Seq
。
val Lead_all = Leads.join(Utm_Master,
Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")
当然,这只有在连接列的名称相同时才有效。
试试这个:
val rccJoin=dfRccDeuda.as("dfdeuda")
.join(dfRccCliente.as("dfcliente")
,col("dfdeuda.etarcid")===col("dfcliente.etarcid")
&& col("dfdeuda.etarcid")===col("dfcliente.etarcid"),"inner")
在 Pyspark 中,在每个条件周围使用括号是在连接条件中使用多个列名的关键。
joined_df = df1.join(df2,
(df1['name'] == df2['name']) &
(df1['phone'] == df2['phone'])
)