如何使用 SQL 在 Databricks/Apache Spark 中获取每个类别的单个记录?
How to get a single record per category in Databricks/Apache Spark using SQL?
问题:
我无法在 Databricks 中获取每个类别的一条记录。例如,我想为下面示例数据中的每个 entry_id 值获取一行。
示例数据:
几乎是我想要的:
如何只获取每个类别的最后一行?可能有多个相同的记录 and/or 具有不同值的多个记录。这是数据中的噪声,需要通过恰好选择一条记录来消除。
完整的sql如下:
-- * * *
--
-- INIT THE SESSION
--
-- * * *
create database if not exists test;
use test;
set spark.sql.legacy.timeParserPolicy = LEGACY;
set spark.sql.legacy.parquet.datetimeRebaseModeInWrite = LEGACY;
select concat('using: ', 'test') as message;
drop table if exists test;
create table test as (
select 'a' a, 'b' b, 'c' c, 'd' d, 'e' e, 'f' f, 0 entry_id
union all
select 'this','is','NOT','foo','bar','bat', 1
union all
select 'this','is','NOT','john','paul','george', 1
union all
select 'this','is','the','data','i','want', 1
union all
select null, 'wrong', null, null, null, null, 2
union all
select null, null, 'wrong', null, null, null, 3
union all
select null, null, null, 'wrong', null, null, 4
union all
select null, null, null, null, 'wrong', null, 5
union all
select null, null, null, null, null, 'wrong', 6
union all
select null, null, null, null, null, null, 7
union all
select 'b', 'wrong', null, null, null, null, 8
union all
select 'c', 'wrong', null, null, null, null, 9
);
select * from test order by entry_id;
drop table if exists max_a;
create table max_a as (
select
entry_id,
max(a) a
from
test
group by 1
order by 1
);
select * from max_a;
select
test.*
from
max_a join test on max_a.a = test.a
order by entry_id
;
select
test.*
from
max_a join test on max_a.a = test.a
where test.entry_id = 1
;
Databricks Runtime 10.0 及以上支持QUALIFY
我点的ROW_NUMBER是NULL,不过你可以改成你喜欢的。
select *
from t
qualify row_number() over (partition by entry_id order by null) = 1
a
b
c
d
e
f
entry_id
a
b
c
d
e
f
0
this
is
NOT
foo
bar
bat
1
null
wrong
null
null
null
null
2
null
null
wrong
null
null
null
3
null
null
null
wrong
null
null
4
null
null
null
null
wrong
null
5
null
null
null
null
null
wrong
6
null
null
null
null
null
null
7
b
wrong
null
null
null
null
8
c
wrong
null
null
null
null
9
P.S.
Spark 中有很多更好的方法来创建临时数据集
堆栈
with t(a,b,c,d,e,f,entry_id)
as
(
select stack
(
12 -- number of rows
,'a' ,'b' ,'c' ,'d' ,'e' ,'f' ,0
,'this' ,'is' ,'NOT' ,'foo' ,'bar' ,'bat' ,1
,'this' ,'is' ,'NOT' ,'john' ,'paul' ,'george' ,1
,'this' ,'is' ,'the' ,'data' ,'i' ,'want' ,1
,null ,'wrong' , null ,null ,null ,null ,2
,null ,null ,'wrong' ,null ,null ,null ,3
,null ,null ,null ,'wrong' ,null ,null ,4
,null ,null ,null ,null ,'wrong' ,null ,5
,null ,null ,null ,null ,null ,'wrong' ,6
,null ,null ,null ,null ,null ,null ,7
,'b' ,'wrong' ,null ,null ,null ,null ,8
,'c' ,'wrong' ,null ,null ,null ,null ,9
)
)
select *
from t
qualify row_number() over (partition by entry_id order by null) = 1
值
with t(a,b,c,d,e,f,entry_id)
as
(
select *
from values ('a' ,'b' ,'c' ,'d' ,'e' ,'f' ,0)
,('this' ,'is' ,'NOT' ,'foo' ,'bar' ,'bat' ,1)
,('this' ,'is' ,'NOT' ,'john' ,'paul' ,'george' ,1)
,('this' ,'is' ,'the' ,'data' ,'i' ,'want' ,1)
,(null ,'wrong' , null ,null ,null ,null ,2)
,(null ,null ,'wrong' ,null ,null ,null ,3)
,(null ,null ,null ,'wrong' ,null ,null ,4)
,(null ,null ,null ,null ,'wrong' ,null ,5)
,(null ,null ,null ,null ,null ,'wrong' ,6)
,(null ,null ,null ,null ,null ,null ,7)
,('b' ,'wrong' ,null ,null ,null ,null ,8)
,('c' ,'wrong' ,null ,null ,null ,null ,9)
)
select *
from t
qualify row_number() over (partition by entry_id order by null) = 1
我最终做了如下事情:
select distinct
test.*
from (
select
test.*,
row_number() over (partition by entry_id, a order by c) as row_number
from
test
order by entry_id, row_number
) test
join max_row mx on 1=1
and mx.entry_id = test.entry_id
and mx.max_row_number = test.row_number
;
问题:
我无法在 Databricks 中获取每个类别的一条记录。例如,我想为下面示例数据中的每个 entry_id 值获取一行。
示例数据:
几乎是我想要的:
如何只获取每个类别的最后一行?可能有多个相同的记录 and/or 具有不同值的多个记录。这是数据中的噪声,需要通过恰好选择一条记录来消除。
完整的sql如下:
-- * * *
--
-- INIT THE SESSION
--
-- * * *
create database if not exists test;
use test;
set spark.sql.legacy.timeParserPolicy = LEGACY;
set spark.sql.legacy.parquet.datetimeRebaseModeInWrite = LEGACY;
select concat('using: ', 'test') as message;
drop table if exists test;
create table test as (
select 'a' a, 'b' b, 'c' c, 'd' d, 'e' e, 'f' f, 0 entry_id
union all
select 'this','is','NOT','foo','bar','bat', 1
union all
select 'this','is','NOT','john','paul','george', 1
union all
select 'this','is','the','data','i','want', 1
union all
select null, 'wrong', null, null, null, null, 2
union all
select null, null, 'wrong', null, null, null, 3
union all
select null, null, null, 'wrong', null, null, 4
union all
select null, null, null, null, 'wrong', null, 5
union all
select null, null, null, null, null, 'wrong', 6
union all
select null, null, null, null, null, null, 7
union all
select 'b', 'wrong', null, null, null, null, 8
union all
select 'c', 'wrong', null, null, null, null, 9
);
select * from test order by entry_id;
drop table if exists max_a;
create table max_a as (
select
entry_id,
max(a) a
from
test
group by 1
order by 1
);
select * from max_a;
select
test.*
from
max_a join test on max_a.a = test.a
order by entry_id
;
select
test.*
from
max_a join test on max_a.a = test.a
where test.entry_id = 1
;
Databricks Runtime 10.0 及以上支持QUALIFY
我点的ROW_NUMBER是NULL,不过你可以改成你喜欢的。
select *
from t
qualify row_number() over (partition by entry_id order by null) = 1
a | b | c | d | e | f | entry_id |
---|---|---|---|---|---|---|
a | b | c | d | e | f | 0 |
this | is | NOT | foo | bar | bat | 1 |
null | wrong | null | null | null | null | 2 |
null | null | wrong | null | null | null | 3 |
null | null | null | wrong | null | null | 4 |
null | null | null | null | wrong | null | 5 |
null | null | null | null | null | wrong | 6 |
null | null | null | null | null | null | 7 |
b | wrong | null | null | null | null | 8 |
c | wrong | null | null | null | null | 9 |
P.S.
Spark 中有很多更好的方法来创建临时数据集
堆栈
with t(a,b,c,d,e,f,entry_id)
as
(
select stack
(
12 -- number of rows
,'a' ,'b' ,'c' ,'d' ,'e' ,'f' ,0
,'this' ,'is' ,'NOT' ,'foo' ,'bar' ,'bat' ,1
,'this' ,'is' ,'NOT' ,'john' ,'paul' ,'george' ,1
,'this' ,'is' ,'the' ,'data' ,'i' ,'want' ,1
,null ,'wrong' , null ,null ,null ,null ,2
,null ,null ,'wrong' ,null ,null ,null ,3
,null ,null ,null ,'wrong' ,null ,null ,4
,null ,null ,null ,null ,'wrong' ,null ,5
,null ,null ,null ,null ,null ,'wrong' ,6
,null ,null ,null ,null ,null ,null ,7
,'b' ,'wrong' ,null ,null ,null ,null ,8
,'c' ,'wrong' ,null ,null ,null ,null ,9
)
)
select *
from t
qualify row_number() over (partition by entry_id order by null) = 1
值
with t(a,b,c,d,e,f,entry_id)
as
(
select *
from values ('a' ,'b' ,'c' ,'d' ,'e' ,'f' ,0)
,('this' ,'is' ,'NOT' ,'foo' ,'bar' ,'bat' ,1)
,('this' ,'is' ,'NOT' ,'john' ,'paul' ,'george' ,1)
,('this' ,'is' ,'the' ,'data' ,'i' ,'want' ,1)
,(null ,'wrong' , null ,null ,null ,null ,2)
,(null ,null ,'wrong' ,null ,null ,null ,3)
,(null ,null ,null ,'wrong' ,null ,null ,4)
,(null ,null ,null ,null ,'wrong' ,null ,5)
,(null ,null ,null ,null ,null ,'wrong' ,6)
,(null ,null ,null ,null ,null ,null ,7)
,('b' ,'wrong' ,null ,null ,null ,null ,8)
,('c' ,'wrong' ,null ,null ,null ,null ,9)
)
select *
from t
qualify row_number() over (partition by entry_id order by null) = 1
我最终做了如下事情:
select distinct
test.*
from (
select
test.*,
row_number() over (partition by entry_id, a order by c) as row_number
from
test
order by entry_id, row_number
) test
join max_row mx on 1=1
and mx.entry_id = test.entry_id
and mx.max_row_number = test.row_number
;