如何获取每个分区的最后一个值以估算 spark SQL 中的缺失值
How to get last value for every partition to impute missing value in spark SQL
我有一个示例数据,我想在其中估算缺失值。缺少数据的行用 blank
表示。这是示例数据 -
val my_df = spark.sql(s"""
select 1 as id, 1 as time_gmt, 'a' as pagename
union
select 1 as id, 2 as time_gmt, 'b' as pagename
union
select 1 as id, 3 as time_gmt, 'blank' as pagename
union
select 1 as id, 4 as time_gmt, 'blank' as pagename
union
select 1 as id, 5 as time_gmt, 'd' as pagename
union
select 2 as id, 1 as time_gmt, 'c' as pagename
union
select 2 as id, 2 as time_gmt, 'a' as pagename
union
select 2 as id, 3 as time_gmt, 'c' as pagename
union
select 2 as id, 4 as time_gmt, 'blank' as pagename
union
select 2 as id, 5 as time_gmt, 'd' as pagename
""")
my_df.createOrReplaceTempView("my_df")
scala> my_df.orderBy("id","time_gmt").show(false)
+---+--------+--------+
|id |time_gmt|pagename|
+---+--------+--------+
|1 |1 |a |
|1 |2 |b |
|1 |3 |blank |
|1 |4 |blank |
|1 |5 |d |
|2 |1 |c |
|2 |2 |a |
|2 |3 |c |
|2 |4 |blank |
|2 |5 |d |
+---+--------+--------+
如您所见,ID 为 1 的数据有 2 个空白,ID 为 2 的数据有 1 个空白。我想使用为每个 ID 观察到的最新非空白值填充这些值,按 time_gmt
栏目。所以我的输出是 -
+---+--------+--------+----------------+
|id |time_gmt|pagename|pagename_imputed|
+---+--------+--------+----------------+
|1 |1 |a | a |
|1 |2 |b | b |
|1 |3 |blank | b |
|1 |4 |blank | b |
|1 |5 |d | d |
|2 |1 |c | c |
|2 |2 |a | a |
|2 |3 |c | c |
|2 |4 |blank | c |
|2 |5 |d | d |
+---+--------+--------+----------------+
如何在 spark SQL 中执行此操作?
注意 - 对于每个分区,非空白值后的空白可以出现多次。
一个选项使用 window 函数。这个想法是定义记录组,其中“空白”记录将与最后一个非空白记录属于同一组。
假设 空白 你的意思是 null
,我们可以用 window 计数来定义组:
select id, time_gmt,
max(pagename) over(partition by id, grp) as pagename
from (
select t.*,
count(pagename) over(partition by id order by time_gmt) as grp
from mytable t
) t
如果你真的是指字符串'blank'
,那么:
select id, time_gmt,
max(case when pagename <> 'blank' then pagename end) over(partition by id, grp) as pagename
from (
select t.*,
sum(case when pagename = 'blank' then 0 else 1 end) over(partition by id order by time_gmt) as grp
from mytable t
) t
对于此类用例,有一个特殊的 window 函数 last(expr, [IgnoreNulls]) over()
:
select id, time_gmt, last(nullif(pagename, 'blank'), true) over(partition by id order by time_gmt) as pagename
from my_df
https://spark.apache.org/docs/latest/api/sql/index.html#last
我有一个示例数据,我想在其中估算缺失值。缺少数据的行用 blank
表示。这是示例数据 -
val my_df = spark.sql(s"""
select 1 as id, 1 as time_gmt, 'a' as pagename
union
select 1 as id, 2 as time_gmt, 'b' as pagename
union
select 1 as id, 3 as time_gmt, 'blank' as pagename
union
select 1 as id, 4 as time_gmt, 'blank' as pagename
union
select 1 as id, 5 as time_gmt, 'd' as pagename
union
select 2 as id, 1 as time_gmt, 'c' as pagename
union
select 2 as id, 2 as time_gmt, 'a' as pagename
union
select 2 as id, 3 as time_gmt, 'c' as pagename
union
select 2 as id, 4 as time_gmt, 'blank' as pagename
union
select 2 as id, 5 as time_gmt, 'd' as pagename
""")
my_df.createOrReplaceTempView("my_df")
scala> my_df.orderBy("id","time_gmt").show(false)
+---+--------+--------+
|id |time_gmt|pagename|
+---+--------+--------+
|1 |1 |a |
|1 |2 |b |
|1 |3 |blank |
|1 |4 |blank |
|1 |5 |d |
|2 |1 |c |
|2 |2 |a |
|2 |3 |c |
|2 |4 |blank |
|2 |5 |d |
+---+--------+--------+
如您所见,ID 为 1 的数据有 2 个空白,ID 为 2 的数据有 1 个空白。我想使用为每个 ID 观察到的最新非空白值填充这些值,按 time_gmt
栏目。所以我的输出是 -
+---+--------+--------+----------------+
|id |time_gmt|pagename|pagename_imputed|
+---+--------+--------+----------------+
|1 |1 |a | a |
|1 |2 |b | b |
|1 |3 |blank | b |
|1 |4 |blank | b |
|1 |5 |d | d |
|2 |1 |c | c |
|2 |2 |a | a |
|2 |3 |c | c |
|2 |4 |blank | c |
|2 |5 |d | d |
+---+--------+--------+----------------+
如何在 spark SQL 中执行此操作?
注意 - 对于每个分区,非空白值后的空白可以出现多次。
一个选项使用 window 函数。这个想法是定义记录组,其中“空白”记录将与最后一个非空白记录属于同一组。
假设 空白 你的意思是 null
,我们可以用 window 计数来定义组:
select id, time_gmt,
max(pagename) over(partition by id, grp) as pagename
from (
select t.*,
count(pagename) over(partition by id order by time_gmt) as grp
from mytable t
) t
如果你真的是指字符串'blank'
,那么:
select id, time_gmt,
max(case when pagename <> 'blank' then pagename end) over(partition by id, grp) as pagename
from (
select t.*,
sum(case when pagename = 'blank' then 0 else 1 end) over(partition by id order by time_gmt) as grp
from mytable t
) t
对于此类用例,有一个特殊的 window 函数 last(expr, [IgnoreNulls]) over()
:
select id, time_gmt, last(nullif(pagename, 'blank'), true) over(partition by id order by time_gmt) as pagename
from my_df
https://spark.apache.org/docs/latest/api/sql/index.html#last