计算给定日期范围内 window 中的行数 pyspark

Count rows in a window in a given date range pyspark

我在 table

中有以下信息
| equipment | run | runend   | failure | removal_date |
| A         | 1   | 1/1/2021 | 0       | 4/1/2021     |
| A         | 2   | 2/1/2021 | 0       | 4/1/2021     |
| A         | 3   | 3/1/2021 | 0       | 4/1/2021     |
| A         | 4   | 4/1/2021 | 1       | 4/1/2021     |
| A         | 5   | 4/1/2021 | 0       | 20/1/2021   |
| A         | 6   | 10/1/2021 | 0       | 20/1/2021     |

我想创建一个额外的列来倒计时到失败点,所以看起来像这样:

| equipment | run | runend    | failure | removal_date | RUL |
| A         | 1   | 1/1/2021  | 0       | 4/1/2021     | 3   |
| A         | 2   | 2/1/2021  | 0       | 4/1/2021     | 2   |
| A         | 3   | 3/1/2021  | 0       | 4/1/2021     | 1   |
| A         | 4   | 4/1/2021  | 1       | 4/1/2021     | 0   |
| A         | 5   | 4/1/2021  | 0       | 20/1/2021    | 16  |
| A         | 6   | 10/1/2021 | 0       | 20/1/2021    | 10  |

所以基本上是一个计数,其中每一行都被计算到最接近 table 中显示的 removal_date 的运行端。

我认为这可以使用 window 函数来实现,并且我设法添加了一个列来计算设备的所有行,但我仍然不知道如何缩小这个范围 window 向下然后计数,其中第一行实际采用最后一次计数并向后工作。这是我目前所拥有的:

w = Window.partitionBy("equipment", "run").orderBy(asc("runend"))
df = df.withColumn("rank", rank().over(w))

# Just to see what the df looks like
df.where(col("equipment") == "A").groupby("equipment", "run", "rank", "failure", "runend", "removal_date").count().orderBy("equipment", "runend").show()

所以我得到了一个看起来像那样的 table,我认为我在正确的轨道上,但仍然缺少一些部分

| equipment | run | runend    | failure | removal_date | rank|
| A         | 1   | 1/1/2021  | 0       | 4/1/2021     | 1   |
| A         | 2   | 2/1/2021  | 0       | 4/1/2021     | 2   |
| A         | 3   | 3/1/2021  | 0       | 4/1/2021     | 3   |
| A         | 4   | 4/1/2021  | 1       | 4/1/2021     | 4   |
| A         | 5   | 4/1/2021  | 0       | 20/1/2021    | 5   |
| A         | 6   | 10/1/2021 | 0       | 20/1/2021    | 6   |

这可以使用简单的 datediff 函数完成,无需使用 window 概念。以下是代码。

>>> from pyspark.sql import functions as f

>>> df1 = spark.createDataFrame([
        ("A",1,"1/1/2021",0,"4/1/2021"),
        ("A",2,"2/1/2021",0,"4/1/2021"),
        ("A",3,"3/1/2021",0,"4/1/2021"),
        ("A",4,"4/1/2021",1,"4/1/2021"),
        ("A",5,"4/1/2021",0,"20/1/2021"),
        ("A",6,"10/1/2021",0,"20/1/2021")
    ], schema=["equipment","run","runend","failure","removal_date"]
)

>>> df1.show()
+---------+---+---------+-------+------------+
|equipment|run|   runend|failure|removal_date|
+---------+---+---------+-------+------------+
|        A|  1| 1/1/2021|      0|    4/1/2021|
|        A|  2| 2/1/2021|      0|    4/1/2021|
|        A|  3| 3/1/2021|      0|    4/1/2021|
|        A|  4| 4/1/2021|      1|    4/1/2021|
|        A|  5| 4/1/2021|      0|   20/1/2021|
|        A|  6|10/1/2021|      0|   20/1/2021|
+---------+---+---------+-------+------------+

>>> df2 = df1.withColumn("runend", f.to_date(f.col("runend"), "d/M/yyyy")) \
        .withColumn("removal_date", f.to_date(f.col("removel_date"), "d/M/yyyy"))   

>>> df2.show()
+---------+---+----------+-------+------------+
|equipment|run|    runend|failure|removal_date|
+---------+---+----------+-------+------------+
|        A|  1|2021-01-01|      0|  2021-01-04|
|        A|  2|2021-01-02|      0|  2021-01-04|
|        A|  3|2021-01-03|      0|  2021-01-04|
|        A|  4|2021-01-04|      1|  2021-01-04|
|        A|  5|2021-01-04|      0|  2021-01-20|
|        A|  6|2021-01-10|      0|  2021-01-20|
+---------+---+----------+-------+------------+

>>> df3 = df2.withColumn("RUL", f.datediff(f.col("removal_date"), f.col("runend")))

>>> df3.show()
+---------+---+----------+-------+------------+---+
|equipment|run|    runend|failure|removal_date|RUL|
+---------+---+----------+-------+------------+---+
|        A|  1|2021-01-01|      0|  2021-01-04|  3|
|        A|  2|2021-01-02|      0|  2021-01-04|  2|
|        A|  3|2021-01-03|      0|  2021-01-04|  1|
|        A|  4|2021-01-04|      1|  2021-01-04|  0|
|        A|  5|2021-01-04|      0|  2021-01-20| 16|
|        A|  6|2021-01-10|      0|  2021-01-20| 10|
+---------+---+----------+-------+------------+---+