Spark Window 函数是否在每个分区中独立工作?

Do Spark Window functions work independently per partition?

我试图为每个 some_guid 获取每天的最新行。 例如,我有以下数据,所有内容按 item_time 降序排列:

+----------+--------------------+-------------+
| file_date|           some_guid|    item_time|
+----------+--------------------+-------------+
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1632637545493|
|2021-11-22|22549ca165d88ffd2...|1632723945493|
|2021-11-22|22549ca165d88ffd2...|1632810345493|
|2021-11-22|22549ca165d88ffd2...|1632896745493|
|2021-11-22|22549ca165d88ffd2...|1632983145493|
|2021-11-22|22549ca165d88ffd2...|1633069545493|
|2021-11-22|22549ca165d88ffd2...|1633155945493|
|2021-11-22|22549ca165d88ffd2...|1633242345493|
|2021-11-22|22549ca165d88ffd2...|1633328745493|
|2021-11-22|22549ca165d88ffd2...|1633415145493|
|2021-11-22|22549ca165d88ffd2...|1633501545493|
|2021-11-22|22549ca165d88ffd2...|1633587945493|
|2021-11-22|22549ca165d88ffd2...|1633674345493|
|2021-11-22|22549ca165d88ffd2...|1633760745493|
|2021-11-22|22549ca165d88ffd2...|1633847145493|

如您所见,item_time 中的所有字段都不相同。然后我应用以下转换:

daily_window = Window.partitionBy('file_date', 'some_guid').orderBy(col('item_time').desc())
df.select('file_date','some_guid', first('item_time').over(daily_window).alias('item_time'))

并得到如下结果:

+----------+--------------------+-------------+
| file_date|           some_guid|    item_time|
+----------+--------------------+-------------+
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|

有很多重复项,但我希望只有一行。为什么会这样? window 函数是否在每个分区中执行并给出相同的行,稍后打印的次数与我拥有的分区一样多?

UPD:

如果我有第四列,我该如何做呢?例如,select 以下数据集中的最后一行:

+----------+--------------------+-------------+------+
| file_date|           some_guid|    item_time|  col4|
+----------+--------------------+-------------+------+
|2021-11-22|22549ca165d88ffd2...|1632562345493| data1|
|2021-11-22|22549ca165d88ffd2...|1632637545493| data2|
|2021-11-22|22549ca165d88ffd2...|1632723945493| data3|
|2021-11-22|22549ca165d88ffd2...|1632810345493| data4|

而且我需要 select 最后一行 'data4' 在 col4

使用分组:

df.groupBy('file_date','some_guid').agg(max('item_time'))

或使用window函数(即用rank/row_number)枚举记录,然后使用where/filter到select想要的记录

您正在按 fill_datesome_guid 聚合,查看您的数据,您只有一组:

fill_date some_guid
2021-11-22 22549ca165d88ffd2...

(我们没有看到 some_guid 的其余部分,并且似乎所有行都相同)

然后,它为所有行应用第一个值。直到这里它似乎是正确的。

但是,我建议您尝试 withColumn() 而不是 select:

df.withColumn('item_time'), first('item_time').over(daily_window))

编辑:

如果您只希望使用 groupby 一行。按照之前的回答:

Windows 是当您希望组的所有行都具有基于组值的计算值时。

使用 row_number 后接过滤器。

daily_window = Window.partitionBy('file_date', 'some_guid').orderBy(col('item_time').desc())
df.select(
    'file_date','some_guid','col4',
    row_number().over(daily_window).alias('rn')
  ).filter("rn = 1")