Pyspark - 数据框中给定日期之前的有序最小值
Pyspark - Orderly min value until a given date in a dataframe
假设我有一个数据框如下:
date
timestamp
value
2022-01-05
2022-01-05 06:00:00
-0.3
2022-01-04
2022-01-04 04:00:00
-0.6
2022-01-03
2022-01-03 15:00:00
-0.1
2022-01-03
2022-01-03 10:00:00
-0.15
2022-01-02
2022-01-02 14:00:00
-0.3
2022-01-02
2022-01-02 12:00:00
-0.1
2022-01-01
2022-01-01 12:00:00
-0.2
我想创建一个具有最新最小值的列,直到 timestamp
所以结果将是:
date
timestamp
value
min_value_until_now
2022-01-05
2022-01-05 06:00:00
-0.3
-0.6
2022-01-04
2022-01-04 04:00:00
-0.6
-0.3
2022-01-03
2022-01-03 15:00:00
-0.1
-0.3
2022-01-03
2022-01-03 10:00:00
-0.15
-0.3
2022-01-02
2022-01-02 14:00:00
-0.3
-0.2
2022-01-02
2022-01-02 12:00:00
-0.1
-0.2
2022-01-01
2022-01-01 12:00:00
-0.2
-0.2
在2022-01-01
上没有历史数据,因此我可以用-0.2
代替它,这是开始时唯一可用的点。
我该怎么做?我试过开窗但没有成功。
需要注意的重要一点是 min_value_until_now
应该单调递减。
如有任何帮助,我们将不胜感激。
在 Window 上使用 min
函数:
from pyspark.sql import functions as F, Window
w = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, -1)
df.withColumn(
"min_value_until_now",
F.coalesce(F.min("value").over(w), F.col("value"))
).show()
#+----------+-------------------+-----+-------------------+
#| date| timestamp|value|min_value_until_now|
#+----------+-------------------+-----+-------------------+
#|2022-01-01|2022-01-01 12:00:00| -0.2| -0.2|
#|2022-01-02|2022-01-02 12:00:00| -0.1| -0.2|
#|2022-01-02|2022-01-02 14:00:00| -0.3| -0.2|
#|2022-01-03|2022-01-03 10:00:00|-0.15| -0.3|
#|2022-01-03|2022-01-03 15:00:00| -0.1| -0.3|
#|2022-01-04|2022-01-04 04:00:00| -0.6| -0.3|
#|2022-01-05|2022-01-05 06:00:00| -0.3| -0.6|
#+----------+-------------------+-----+-------------------+
请注意,使用 non-partitioned Window 可能会对性能产生不良影响。如果您有可以分区的列 ID
,则应添加 partitionBy
子句。
假设我有一个数据框如下:
date | timestamp | value |
---|---|---|
2022-01-05 | 2022-01-05 06:00:00 | -0.3 |
2022-01-04 | 2022-01-04 04:00:00 | -0.6 |
2022-01-03 | 2022-01-03 15:00:00 | -0.1 |
2022-01-03 | 2022-01-03 10:00:00 | -0.15 |
2022-01-02 | 2022-01-02 14:00:00 | -0.3 |
2022-01-02 | 2022-01-02 12:00:00 | -0.1 |
2022-01-01 | 2022-01-01 12:00:00 | -0.2 |
我想创建一个具有最新最小值的列,直到 timestamp
所以结果将是:
date | timestamp | value | min_value_until_now |
---|---|---|---|
2022-01-05 | 2022-01-05 06:00:00 | -0.3 | -0.6 |
2022-01-04 | 2022-01-04 04:00:00 | -0.6 | -0.3 |
2022-01-03 | 2022-01-03 15:00:00 | -0.1 | -0.3 |
2022-01-03 | 2022-01-03 10:00:00 | -0.15 | -0.3 |
2022-01-02 | 2022-01-02 14:00:00 | -0.3 | -0.2 |
2022-01-02 | 2022-01-02 12:00:00 | -0.1 | -0.2 |
2022-01-01 | 2022-01-01 12:00:00 | -0.2 | -0.2 |
在2022-01-01
上没有历史数据,因此我可以用-0.2
代替它,这是开始时唯一可用的点。
我该怎么做?我试过开窗但没有成功。
需要注意的重要一点是 min_value_until_now
应该单调递减。
如有任何帮助,我们将不胜感激。
在 Window 上使用 min
函数:
from pyspark.sql import functions as F, Window
w = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, -1)
df.withColumn(
"min_value_until_now",
F.coalesce(F.min("value").over(w), F.col("value"))
).show()
#+----------+-------------------+-----+-------------------+
#| date| timestamp|value|min_value_until_now|
#+----------+-------------------+-----+-------------------+
#|2022-01-01|2022-01-01 12:00:00| -0.2| -0.2|
#|2022-01-02|2022-01-02 12:00:00| -0.1| -0.2|
#|2022-01-02|2022-01-02 14:00:00| -0.3| -0.2|
#|2022-01-03|2022-01-03 10:00:00|-0.15| -0.3|
#|2022-01-03|2022-01-03 15:00:00| -0.1| -0.3|
#|2022-01-04|2022-01-04 04:00:00| -0.6| -0.3|
#|2022-01-05|2022-01-05 06:00:00| -0.3| -0.6|
#+----------+-------------------+-----+-------------------+
请注意,使用 non-partitioned Window 可能会对性能产生不良影响。如果您有可以分区的列 ID
,则应添加 partitionBy
子句。