在加入 table 时左加入聚合函数和 select

left join with both aggregate function and select on joining table

我有一个一般的 SQL 查询问题,我认为可以用各种 SQL 方式回答,尽管我下面的示例使用的是 spark sql.

我正在尝试将 table 1 (t1) 左连接到 table 2 (t2),目标如下:

  1. 保留 t1 中的所有值(因此左连接)
  2. select t2 中的一列基于 t2
  3. 中的聚合函数

测试数据如下:

t1
+---+--------+
|pk1|constant|
+---+--------+
|  a|constant|
|  b|constant|
|  c|constant|
|  d|constant|
+---+--------+
t2
+---+---------+------+
|fk1|condition|target|
+---+---------+------+
|  a|        1|check1|
|  a|        2|check2|
|  b|        1|check1|
|  b|        2|check2|
+---+---------+------+

这里有几个(失败的)示例查询:

spark.sql("""
    select
        pk1,
        constant,
        target
    from
        t1
    left join
        t2
    on
        pk1 = fk1
    group by
        pk1, constant, target
    having
        min(condition)
""").show
+---+--------+------+
|pk1|constant|target|
+---+--------+------+
|  b|constant|check1|
|  a|constant|check2|
|  a|constant|check1|
|  b|constant|check2|
+---+--------+------+

查询 1 有问题:我丢失了 'c''d'pk1t1 行。对我来说它看起来像是内部连接而不是左连接。

spark.sql("""
    select
        pk1,
        constant,
        min(condition),
        target
    from
        t1
    left join
        t2
    on
        pk1 = fk1
    group by
        pk1, constant, target
""").show
+---+--------+--------------+------+
|pk1|constant|min(condition)|target|
+---+--------+--------------+------+
|  a|constant|             1|check1|
|  a|constant|             2|check2|
|  b|constant|             2|check2|
|  b|constant|             1|check1|
|  c|constant|          null|  null|
|  d|constant|          null|  null|
+---+--------+--------------+------+

查询 2 有问题:我不再对条件的最小值进行筛选。例如,对于 pk1 = a,我采用了 condition = 1condition = 2。似乎没有应用 min 函数。

期望输出

+---+--------+--------------+------+
|pk1|constant|min(condition)|target|
+---+--------+--------------+------+
|  a|constant|             1|check1|
|  b|constant|             1|check1|
|  c|constant|          null|  null|
|  d|constant|          null|  null|
+---+--------+--------------+------+

min(condition) 列是可选的。无论如何,我稍后会过滤掉它。

我可以通过将查询分成两个语句来创建所需的输出,但我觉得这里必须有一个优雅的单一查询解决方案。有人知道如何做到这一点吗?谢谢!

附录

以下是构建测试 table 的命令,以防有人想复制测试:

val columns1 = Seq("pk1", "constant")
val columns2 = Seq("fk1","condition","target")
val data1 = Seq( ("a","constant"), ("b","constant"), ("c","constant"), ("d","constant") )
val data2 = Seq( ("a",1,"check1"), ("a",2,"check2"), ("b",1,"check1"), ("b",2,"check2") )
val t1 = spark.createDataFrame(data1).toDF(columns1:_*)
val t2 = spark.createDataFrame(data2).toDF(columns2:_*)

先在fk1上按t2分组,在struct(condition, target)上用min得到min条件对应的目标值,然后用t1加入分组结果:

spark.sql("""   
  WITH t3 AS (
    SELECT  fk1, 
            MIN(struct(condition, target))['target'] AS target
    FROM    t2
    GROUP BY fk1
  )

  SELECT  pk1,
          constant,
          target
  FROM    t1
  LEFT JOIN t3
  ON    pk1 = fk1
""").show

//+---+--------+------+
//|pk1|constant|target|
//+---+--------+------+
//|  a|constant|check1|
//|  b|constant|check1|
//|  c|constant|  null|
//|  d|constant|  null|
//+---+--------+------+

另一种使用row_number() window函数的方法:

spark.sql("""
    WITH t3 AS (
      SELECT  *, 
              ROW_NUMBER() OVER (PARTITION BY fk1 ORDER BY condition) AS rn
      FROM    t2
    )
    
    SELECT  pk1,
            constant,
            target
    FROM    t1
    LEFT JOIN t3
    ON    pk1 = fk1
    AND   rn = 1
""").show

如果你想左连接然后做聚合:

spark.sql("""   
    SELECT  pk1,
            MAX(constant) AS constant,
            MIN(struct(condition, target))['target'] AS target
    FROM    t1
    LEFT JOIN t2
    ON    pk1 = fk1
    GROUP BY pk1
""").show