Spark Window 函数是否在每个分区中独立工作?
Do Spark Window functions work independently per partition?
我试图为每个 some_guid 获取每天的最新行。
例如,我有以下数据,所有内容按 item_time 降序排列:
+----------+--------------------+-------------+
| file_date| some_guid| item_time|
+----------+--------------------+-------------+
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1632637545493|
|2021-11-22|22549ca165d88ffd2...|1632723945493|
|2021-11-22|22549ca165d88ffd2...|1632810345493|
|2021-11-22|22549ca165d88ffd2...|1632896745493|
|2021-11-22|22549ca165d88ffd2...|1632983145493|
|2021-11-22|22549ca165d88ffd2...|1633069545493|
|2021-11-22|22549ca165d88ffd2...|1633155945493|
|2021-11-22|22549ca165d88ffd2...|1633242345493|
|2021-11-22|22549ca165d88ffd2...|1633328745493|
|2021-11-22|22549ca165d88ffd2...|1633415145493|
|2021-11-22|22549ca165d88ffd2...|1633501545493|
|2021-11-22|22549ca165d88ffd2...|1633587945493|
|2021-11-22|22549ca165d88ffd2...|1633674345493|
|2021-11-22|22549ca165d88ffd2...|1633760745493|
|2021-11-22|22549ca165d88ffd2...|1633847145493|
如您所见,item_time 中的所有字段都不相同。然后我应用以下转换:
daily_window = Window.partitionBy('file_date', 'some_guid').orderBy(col('item_time').desc())
df.select('file_date','some_guid', first('item_time').over(daily_window).alias('item_time'))
并得到如下结果:
+----------+--------------------+-------------+
| file_date| some_guid| item_time|
+----------+--------------------+-------------+
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
有很多重复项,但我希望只有一行。为什么会这样? window 函数是否在每个分区中执行并给出相同的行,稍后打印的次数与我拥有的分区一样多?
UPD:
如果我有第四列,我该如何做呢?例如,select 以下数据集中的最后一行:
+----------+--------------------+-------------+------+
| file_date| some_guid| item_time| col4|
+----------+--------------------+-------------+------+
|2021-11-22|22549ca165d88ffd2...|1632562345493| data1|
|2021-11-22|22549ca165d88ffd2...|1632637545493| data2|
|2021-11-22|22549ca165d88ffd2...|1632723945493| data3|
|2021-11-22|22549ca165d88ffd2...|1632810345493| data4|
而且我需要 select 最后一行 'data4' 在 col4
使用分组:
df.groupBy('file_date','some_guid').agg(max('item_time'))
或使用window函数(即用rank/row_number)枚举记录,然后使用where/filter到select想要的记录
您正在按 fill_date
和 some_guid
聚合,查看您的数据,您只有一组:
fill_date
some_guid
2021-11-22
22549ca165d88ffd2...
(我们没有看到 some_guid
的其余部分,并且似乎所有行都相同)
然后,它为所有行应用第一个值。直到这里它似乎是正确的。
但是,我建议您尝试 withColumn()
而不是 select
:
df.withColumn('item_time'), first('item_time').over(daily_window))
编辑:
如果您只希望使用 groupby 一行。按照之前的回答:
Windows
是当您希望组的所有行都具有基于组值的计算值时。
使用 row_number
后接过滤器。
daily_window = Window.partitionBy('file_date', 'some_guid').orderBy(col('item_time').desc())
df.select(
'file_date','some_guid','col4',
row_number().over(daily_window).alias('rn')
).filter("rn = 1")
我试图为每个 some_guid 获取每天的最新行。 例如,我有以下数据,所有内容按 item_time 降序排列:
+----------+--------------------+-------------+
| file_date| some_guid| item_time|
+----------+--------------------+-------------+
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1632637545493|
|2021-11-22|22549ca165d88ffd2...|1632723945493|
|2021-11-22|22549ca165d88ffd2...|1632810345493|
|2021-11-22|22549ca165d88ffd2...|1632896745493|
|2021-11-22|22549ca165d88ffd2...|1632983145493|
|2021-11-22|22549ca165d88ffd2...|1633069545493|
|2021-11-22|22549ca165d88ffd2...|1633155945493|
|2021-11-22|22549ca165d88ffd2...|1633242345493|
|2021-11-22|22549ca165d88ffd2...|1633328745493|
|2021-11-22|22549ca165d88ffd2...|1633415145493|
|2021-11-22|22549ca165d88ffd2...|1633501545493|
|2021-11-22|22549ca165d88ffd2...|1633587945493|
|2021-11-22|22549ca165d88ffd2...|1633674345493|
|2021-11-22|22549ca165d88ffd2...|1633760745493|
|2021-11-22|22549ca165d88ffd2...|1633847145493|
如您所见,item_time 中的所有字段都不相同。然后我应用以下转换:
daily_window = Window.partitionBy('file_date', 'some_guid').orderBy(col('item_time').desc())
df.select('file_date','some_guid', first('item_time').over(daily_window).alias('item_time'))
并得到如下结果:
+----------+--------------------+-------------+
| file_date| some_guid| item_time|
+----------+--------------------+-------------+
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
有很多重复项,但我希望只有一行。为什么会这样? window 函数是否在每个分区中执行并给出相同的行,稍后打印的次数与我拥有的分区一样多?
UPD:
如果我有第四列,我该如何做呢?例如,select 以下数据集中的最后一行:
+----------+--------------------+-------------+------+
| file_date| some_guid| item_time| col4|
+----------+--------------------+-------------+------+
|2021-11-22|22549ca165d88ffd2...|1632562345493| data1|
|2021-11-22|22549ca165d88ffd2...|1632637545493| data2|
|2021-11-22|22549ca165d88ffd2...|1632723945493| data3|
|2021-11-22|22549ca165d88ffd2...|1632810345493| data4|
而且我需要 select 最后一行 'data4' 在 col4
使用分组:
df.groupBy('file_date','some_guid').agg(max('item_time'))
或使用window函数(即用rank/row_number)枚举记录,然后使用where/filter到select想要的记录
您正在按 fill_date
和 some_guid
聚合,查看您的数据,您只有一组:
fill_date | some_guid |
---|---|
2021-11-22 | 22549ca165d88ffd2... |
(我们没有看到 some_guid
的其余部分,并且似乎所有行都相同)
然后,它为所有行应用第一个值。直到这里它似乎是正确的。
但是,我建议您尝试 withColumn()
而不是 select
:
df.withColumn('item_time'), first('item_time').over(daily_window))
编辑:
如果您只希望使用 groupby 一行。按照之前的回答:
Windows
是当您希望组的所有行都具有基于组值的计算值时。
使用 row_number
后接过滤器。
daily_window = Window.partitionBy('file_date', 'some_guid').orderBy(col('item_time').desc())
df.select(
'file_date','some_guid','col4',
row_number().over(daily_window).alias('rn')
).filter("rn = 1")