在加入 table 时左加入聚合函数和 select
left join with both aggregate function and select on joining table
我有一个一般的 SQL 查询问题,我认为可以用各种 SQL 方式回答,尽管我下面的示例使用的是 spark sql.
我正在尝试将 table 1 (t1
) 左连接到 table 2 (t2
),目标如下:
- 保留
t1
中的所有值(因此左连接)
- select
t2
中的一列基于 t2
中的聚合函数
测试数据如下:
t1
+---+--------+
|pk1|constant|
+---+--------+
| a|constant|
| b|constant|
| c|constant|
| d|constant|
+---+--------+
t2
+---+---------+------+
|fk1|condition|target|
+---+---------+------+
| a| 1|check1|
| a| 2|check2|
| b| 1|check1|
| b| 2|check2|
+---+---------+------+
这里有几个(失败的)示例查询:
spark.sql("""
select
pk1,
constant,
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
having
min(condition)
""").show
+---+--------+------+
|pk1|constant|target|
+---+--------+------+
| b|constant|check1|
| a|constant|check2|
| a|constant|check1|
| b|constant|check2|
+---+--------+------+
查询 1 有问题:我丢失了 'c'
、'd'
中 pk1
的 t1
行。对我来说它看起来像是内部连接而不是左连接。
spark.sql("""
select
pk1,
constant,
min(condition),
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
""").show
+---+--------+--------------+------+
|pk1|constant|min(condition)|target|
+---+--------+--------------+------+
| a|constant| 1|check1|
| a|constant| 2|check2|
| b|constant| 2|check2|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
+---+--------+--------------+------+
查询 2 有问题:我不再对条件的最小值进行筛选。例如,对于 pk1 = a
,我采用了 condition = 1
和 condition = 2
。似乎没有应用 min 函数。
期望输出
+---+--------+--------------+------+
|pk1|constant|min(condition)|target|
+---+--------+--------------+------+
| a|constant| 1|check1|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
+---+--------+--------------+------+
min(condition)
列是可选的。无论如何,我稍后会过滤掉它。
我可以通过将查询分成两个语句来创建所需的输出,但我觉得这里必须有一个优雅的单一查询解决方案。有人知道如何做到这一点吗?谢谢!
附录
以下是构建测试 table 的命令,以防有人想复制测试:
val columns1 = Seq("pk1", "constant")
val columns2 = Seq("fk1","condition","target")
val data1 = Seq( ("a","constant"), ("b","constant"), ("c","constant"), ("d","constant") )
val data2 = Seq( ("a",1,"check1"), ("a",2,"check2"), ("b",1,"check1"), ("b",2,"check2") )
val t1 = spark.createDataFrame(data1).toDF(columns1:_*)
val t2 = spark.createDataFrame(data2).toDF(columns2:_*)
先在fk1
上按t2
分组,在struct(condition, target)
上用min得到min条件对应的目标值,然后用t1
加入分组结果:
spark.sql("""
WITH t3 AS (
SELECT fk1,
MIN(struct(condition, target))['target'] AS target
FROM t2
GROUP BY fk1
)
SELECT pk1,
constant,
target
FROM t1
LEFT JOIN t3
ON pk1 = fk1
""").show
//+---+--------+------+
//|pk1|constant|target|
//+---+--------+------+
//| a|constant|check1|
//| b|constant|check1|
//| c|constant| null|
//| d|constant| null|
//+---+--------+------+
另一种使用row_number()
window函数的方法:
spark.sql("""
WITH t3 AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY fk1 ORDER BY condition) AS rn
FROM t2
)
SELECT pk1,
constant,
target
FROM t1
LEFT JOIN t3
ON pk1 = fk1
AND rn = 1
""").show
如果你想左连接然后做聚合:
spark.sql("""
SELECT pk1,
MAX(constant) AS constant,
MIN(struct(condition, target))['target'] AS target
FROM t1
LEFT JOIN t2
ON pk1 = fk1
GROUP BY pk1
""").show
我有一个一般的 SQL 查询问题,我认为可以用各种 SQL 方式回答,尽管我下面的示例使用的是 spark sql.
我正在尝试将 table 1 (t1
) 左连接到 table 2 (t2
),目标如下:
- 保留
t1
中的所有值(因此左连接) - select
t2
中的一列基于t2
中的聚合函数
测试数据如下:
t1
+---+--------+
|pk1|constant|
+---+--------+
| a|constant|
| b|constant|
| c|constant|
| d|constant|
+---+--------+
t2
+---+---------+------+
|fk1|condition|target|
+---+---------+------+
| a| 1|check1|
| a| 2|check2|
| b| 1|check1|
| b| 2|check2|
+---+---------+------+
这里有几个(失败的)示例查询:
spark.sql("""
select
pk1,
constant,
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
having
min(condition)
""").show
+---+--------+------+
|pk1|constant|target|
+---+--------+------+
| b|constant|check1|
| a|constant|check2|
| a|constant|check1|
| b|constant|check2|
+---+--------+------+
查询 1 有问题:我丢失了 'c'
、'd'
中 pk1
的 t1
行。对我来说它看起来像是内部连接而不是左连接。
spark.sql("""
select
pk1,
constant,
min(condition),
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
""").show
+---+--------+--------------+------+
|pk1|constant|min(condition)|target|
+---+--------+--------------+------+
| a|constant| 1|check1|
| a|constant| 2|check2|
| b|constant| 2|check2|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
+---+--------+--------------+------+
查询 2 有问题:我不再对条件的最小值进行筛选。例如,对于 pk1 = a
,我采用了 condition = 1
和 condition = 2
。似乎没有应用 min 函数。
期望输出
+---+--------+--------------+------+
|pk1|constant|min(condition)|target|
+---+--------+--------------+------+
| a|constant| 1|check1|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
+---+--------+--------------+------+
min(condition)
列是可选的。无论如何,我稍后会过滤掉它。
我可以通过将查询分成两个语句来创建所需的输出,但我觉得这里必须有一个优雅的单一查询解决方案。有人知道如何做到这一点吗?谢谢!
附录
以下是构建测试 table 的命令,以防有人想复制测试:
val columns1 = Seq("pk1", "constant")
val columns2 = Seq("fk1","condition","target")
val data1 = Seq( ("a","constant"), ("b","constant"), ("c","constant"), ("d","constant") )
val data2 = Seq( ("a",1,"check1"), ("a",2,"check2"), ("b",1,"check1"), ("b",2,"check2") )
val t1 = spark.createDataFrame(data1).toDF(columns1:_*)
val t2 = spark.createDataFrame(data2).toDF(columns2:_*)
先在fk1
上按t2
分组,在struct(condition, target)
上用min得到min条件对应的目标值,然后用t1
加入分组结果:
spark.sql("""
WITH t3 AS (
SELECT fk1,
MIN(struct(condition, target))['target'] AS target
FROM t2
GROUP BY fk1
)
SELECT pk1,
constant,
target
FROM t1
LEFT JOIN t3
ON pk1 = fk1
""").show
//+---+--------+------+
//|pk1|constant|target|
//+---+--------+------+
//| a|constant|check1|
//| b|constant|check1|
//| c|constant| null|
//| d|constant| null|
//+---+--------+------+
另一种使用row_number()
window函数的方法:
spark.sql("""
WITH t3 AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY fk1 ORDER BY condition) AS rn
FROM t2
)
SELECT pk1,
constant,
target
FROM t1
LEFT JOIN t3
ON pk1 = fk1
AND rn = 1
""").show
如果你想左连接然后做聚合:
spark.sql("""
SELECT pk1,
MAX(constant) AS constant,
MIN(struct(condition, target))['target'] AS target
FROM t1
LEFT JOIN t2
ON pk1 = fk1
GROUP BY pk1
""").show