将此 sql 左连接查询转换为 spark 数据帧 (scala)
convert this sql left-join query to spark dataframes (scala)
我有这个 sql 查询,它是一个左连接并且在开头有一个 select 语句,它也从右边的 table 列中选择..
你能帮忙把它转换成spark数据帧并使用spark-shell得到结果吗?
我不想在 spark 中使用 sql 代码,而是想使用数据帧。
我知道 scala 中的连接语法,但我不知道如何从右侧选择 table(这里是 count(w.id2)),因为左侧连接的结果 df 没有可以访问右侧 table 的列。
谢谢!
select count(x.user_id) user_id_count, count(w.id2) current_id2_count
from
(select
user_id
from
tb1
where
year='2021'
and month=1
) x
left join
(select id1, max(id2) id2 from tb2 group by id1) w
on
x.user_id=w.id1;
在 spark 中,我将创建两个数据框 x 和 w 并加入它们:
var x = spark.sqlContext.table("tb1").where("year='2021' and month=1")
var w= spark.sqlContext.table("tb2").groupBy("id1").agg(max("id2")).alias("id2"
var joined = x.join(w, x("user_id")===w("id1"), "left")
编辑:
我对左连接感到困惑。 spark 中有一些错误,列 id2 不可用,我认为这是因为来自左连接的结果 df 将只剩下 table 的列。然而,原因是当我选择 max(id2) 时,我必须正确地给它一个别名。
这是一个示例和解决方案:
var x = Seq("1","2","3","4").toDF("user_id")
var w = Seq (("1", 1), ("1",2), ("3",10),("1",5),("5",4)).toDF("id1", "id2")
var z= w.groupBy("id1").agg(max("id2").alias("id2"))
val xJoinsZ= x.join(z, x("user_id") === z("id1"), "left").select(count(col("user_id").alias("user_id_count")), count(col("id2").alias("current_id2_count")))
scala> x.show(false)
+-------+
|user_id|
+-------+
|1 |
|2 |
|3 |
|4 |
+-------+
scala> z.show(false)
+---+---+
|id1|id2|
+---+---+
|3 |10 |
|5 |4 |
|1 |5 |
+---+---+
scala> xJoinsZ.show(false)
+---------------------------------+---------------------------------+
|count(user_id AS `user_id_count`)|count(id2 AS `current_id2_count`)|
+---------------------------------+---------------------------------+
|4 |2 |
+---------------------------------+---------------------------------+
您的要求很难理解,但我会尝试以您提供的 SQL 代码作为基准并使用 Spark 重现它来回复。
// Reading tb1 (x) and filtering for Jan 2021, selecting only "user_id"
val x: DataFrame = spark.read
.table("tb1")
.filter(col("year") === "2021")
.filter(col("mont") === "01")
.select("user_id")
// Reading tb2 (w) and for each "id1" getting the max "id2"
val w: DataFrame = spark.read
.table("tb2")
.groupBy(col("id1"))
.max("id2")
// Joining tb1 (x) and tb2 (w) on "user_id" === "id1", then counting user_id and id2
val xJoinsW: DataFrame = x
.join(w, x("user_id") === w("id1"), "left")
.select(count(col("user_id").as("user_id_count")), count(col("max(id2)").as("current_id2_count")))
一个小而相关的评论,因为您使用的是 Scala 和 Spark,我建议您使用 val
而不是 var
。 val
表示它是最终的,不能重新分配,而 var
可以稍后重新分配。您可以阅读更多 here.
最后,您可以随意更改 Spark 阅读机制。
我有这个 sql 查询,它是一个左连接并且在开头有一个 select 语句,它也从右边的 table 列中选择.. 你能帮忙把它转换成spark数据帧并使用spark-shell得到结果吗? 我不想在 spark 中使用 sql 代码,而是想使用数据帧。
我知道 scala 中的连接语法,但我不知道如何从右侧选择 table(这里是 count(w.id2)),因为左侧连接的结果 df 没有可以访问右侧 table 的列。
谢谢!
select count(x.user_id) user_id_count, count(w.id2) current_id2_count
from
(select
user_id
from
tb1
where
year='2021'
and month=1
) x
left join
(select id1, max(id2) id2 from tb2 group by id1) w
on
x.user_id=w.id1;
在 spark 中,我将创建两个数据框 x 和 w 并加入它们:
var x = spark.sqlContext.table("tb1").where("year='2021' and month=1")
var w= spark.sqlContext.table("tb2").groupBy("id1").agg(max("id2")).alias("id2"
var joined = x.join(w, x("user_id")===w("id1"), "left")
编辑: 我对左连接感到困惑。 spark 中有一些错误,列 id2 不可用,我认为这是因为来自左连接的结果 df 将只剩下 table 的列。然而,原因是当我选择 max(id2) 时,我必须正确地给它一个别名。
这是一个示例和解决方案:
var x = Seq("1","2","3","4").toDF("user_id")
var w = Seq (("1", 1), ("1",2), ("3",10),("1",5),("5",4)).toDF("id1", "id2")
var z= w.groupBy("id1").agg(max("id2").alias("id2"))
val xJoinsZ= x.join(z, x("user_id") === z("id1"), "left").select(count(col("user_id").alias("user_id_count")), count(col("id2").alias("current_id2_count")))
scala> x.show(false)
+-------+
|user_id|
+-------+
|1 |
|2 |
|3 |
|4 |
+-------+
scala> z.show(false)
+---+---+
|id1|id2|
+---+---+
|3 |10 |
|5 |4 |
|1 |5 |
+---+---+
scala> xJoinsZ.show(false)
+---------------------------------+---------------------------------+
|count(user_id AS `user_id_count`)|count(id2 AS `current_id2_count`)|
+---------------------------------+---------------------------------+
|4 |2 |
+---------------------------------+---------------------------------+
您的要求很难理解,但我会尝试以您提供的 SQL 代码作为基准并使用 Spark 重现它来回复。
// Reading tb1 (x) and filtering for Jan 2021, selecting only "user_id"
val x: DataFrame = spark.read
.table("tb1")
.filter(col("year") === "2021")
.filter(col("mont") === "01")
.select("user_id")
// Reading tb2 (w) and for each "id1" getting the max "id2"
val w: DataFrame = spark.read
.table("tb2")
.groupBy(col("id1"))
.max("id2")
// Joining tb1 (x) and tb2 (w) on "user_id" === "id1", then counting user_id and id2
val xJoinsW: DataFrame = x
.join(w, x("user_id") === w("id1"), "left")
.select(count(col("user_id").as("user_id_count")), count(col("max(id2)").as("current_id2_count")))
一个小而相关的评论,因为您使用的是 Scala 和 Spark,我建议您使用 val
而不是 var
。 val
表示它是最终的,不能重新分配,而 var
可以稍后重新分配。您可以阅读更多 here.
最后,您可以随意更改 Spark 阅读机制。