如何使用 SQL 在 Databricks/Apache Spark 中获取每个类别的单个记录?

How to get a single record per category in Databricks/Apache Spark using SQL?

问题:

我无法在 Databricks 中获取每个类别的一条记录。例如,我想为下面示例数据中的每个 entry_id 值获取一行。

示例数据:

几乎是我想要的:

如何只获取每个类别的最后一行?可能有多个相同的记录 and/or 具有不同值的多个记录。这是数据中的噪声,需要通过恰好选择一条记录来消除。

完整的sql如下:

-- * * *
--
-- INIT THE SESSION
--
-- * * *

create database if not exists test;
use test;
set spark.sql.legacy.timeParserPolicy = LEGACY;
set spark.sql.legacy.parquet.datetimeRebaseModeInWrite = LEGACY;
select concat('using: ', 'test') as message;


drop table if exists test;
create table test as (
select 'a' a, 'b' b, 'c' c, 'd' d, 'e' e, 'f' f, 0 entry_id
union all
select 'this','is','NOT','foo','bar','bat', 1
union all
select 'this','is','NOT','john','paul','george', 1 
union all
select 'this','is','the','data','i','want', 1
union all
select null, 'wrong', null, null, null, null, 2
union all
select null, null, 'wrong', null, null, null, 3
union all
select null, null, null, 'wrong', null, null, 4
union all
select null, null, null, null, 'wrong', null, 5
union all
select null, null, null, null, null, 'wrong', 6
union all
select null, null, null, null, null, null, 7
union all
select 'b', 'wrong', null, null, null, null, 8
union all
select 'c', 'wrong', null, null, null, null, 9
);

select * from test order by entry_id;


drop table if exists max_a;

create table max_a as (
  select
    entry_id,
    max(a) a
  from 
    test
  group by 1
  order by 1
);

select * from max_a;



select
  test.*
from
  max_a join test on max_a.a = test.a
order by entry_id
;

select
  test.*
from
  max_a join test on max_a.a = test.a
where test.entry_id = 1
;

Databricks Runtime 10.0 及以上支持QUALIFY

我点的ROW_NUMBER是NULL,不过你可以改成你喜欢的。

select  * 
from    t
qualify row_number() over (partition by entry_id order by null) = 1
a b c d e f entry_id
a b c d e f 0
this is NOT foo bar bat 1
null wrong null null null null 2
null null wrong null null null 3
null null null wrong null null 4
null null null null wrong null 5
null null null null null wrong 6
null null null null null null 7
b wrong null null null null 8
c wrong null null null null 9

P.S.

Spark 中有很多更好的方法来创建临时数据集

堆栈

with t(a,b,c,d,e,f,entry_id)
as
(
    select  stack
            (
                 12 -- number of rows
                 
                ,'a'    ,'b'     ,'c'     ,'d'      ,'e'     ,'f'       ,0
                ,'this' ,'is'    ,'NOT'   ,'foo'    ,'bar'   ,'bat'     ,1
                ,'this' ,'is'    ,'NOT'   ,'john'   ,'paul'  ,'george'  ,1 
                ,'this' ,'is'    ,'the'   ,'data'   ,'i'     ,'want'    ,1
                ,null   ,'wrong' , null   ,null     ,null    ,null      ,2
                ,null   ,null    ,'wrong' ,null     ,null    ,null      ,3
                ,null   ,null    ,null    ,'wrong'  ,null    ,null      ,4
                ,null   ,null    ,null    ,null     ,'wrong' ,null      ,5
                ,null   ,null    ,null    ,null     ,null    ,'wrong'   ,6
                ,null   ,null    ,null    ,null     ,null    ,null      ,7
                ,'b'    ,'wrong' ,null    ,null     ,null    ,null      ,8
                ,'c'    ,'wrong' ,null    ,null     ,null    ,null      ,9
            )
)
select  * 
from    t
qualify row_number() over (partition by entry_id order by null) = 1

with t(a,b,c,d,e,f,entry_id)
as
(
    select  * 
    from   values   ('a'    ,'b'     ,'c'     ,'d'      ,'e'     ,'f'       ,0)
                   ,('this' ,'is'    ,'NOT'   ,'foo'    ,'bar'   ,'bat'     ,1)
                   ,('this' ,'is'    ,'NOT'   ,'john'   ,'paul'  ,'george'  ,1) 
                   ,('this' ,'is'    ,'the'   ,'data'   ,'i'     ,'want'    ,1)
                   ,(null   ,'wrong' , null   ,null     ,null    ,null      ,2)
                   ,(null   ,null    ,'wrong' ,null     ,null    ,null      ,3)
                   ,(null   ,null    ,null    ,'wrong'  ,null    ,null      ,4)
                   ,(null   ,null    ,null    ,null     ,'wrong' ,null      ,5)
                   ,(null   ,null    ,null    ,null     ,null    ,'wrong'   ,6)
                   ,(null   ,null    ,null    ,null     ,null    ,null      ,7)
                   ,('b'    ,'wrong' ,null    ,null     ,null    ,null      ,8)
                   ,('c'    ,'wrong' ,null    ,null     ,null    ,null      ,9)
           
)
select  * 
from    t
qualify row_number() over (partition by entry_id order by null) = 1

我最终做了如下事情:

select distinct
  test.*
from (
  select
    test.*,
    row_number() over (partition by entry_id, a order by c) as row_number
  from
    test
  order by entry_id, row_number
) test
join max_row mx on 1=1
  and mx.entry_id = test.entry_id 
  and mx.max_row_number = test.row_number
;