计算给定日期范围内 window 中的行数 pyspark
Count rows in a window in a given date range pyspark
我在 table
中有以下信息
| equipment | run | runend | failure | removal_date |
| A | 1 | 1/1/2021 | 0 | 4/1/2021 |
| A | 2 | 2/1/2021 | 0 | 4/1/2021 |
| A | 3 | 3/1/2021 | 0 | 4/1/2021 |
| A | 4 | 4/1/2021 | 1 | 4/1/2021 |
| A | 5 | 4/1/2021 | 0 | 20/1/2021 |
| A | 6 | 10/1/2021 | 0 | 20/1/2021 |
我想创建一个额外的列来倒计时到失败点,所以看起来像这样:
| equipment | run | runend | failure | removal_date | RUL |
| A | 1 | 1/1/2021 | 0 | 4/1/2021 | 3 |
| A | 2 | 2/1/2021 | 0 | 4/1/2021 | 2 |
| A | 3 | 3/1/2021 | 0 | 4/1/2021 | 1 |
| A | 4 | 4/1/2021 | 1 | 4/1/2021 | 0 |
| A | 5 | 4/1/2021 | 0 | 20/1/2021 | 16 |
| A | 6 | 10/1/2021 | 0 | 20/1/2021 | 10 |
所以基本上是一个计数,其中每一行都被计算到最接近 table 中显示的 removal_date 的运行端。
我认为这可以使用 window 函数来实现,并且我设法添加了一个列来计算设备的所有行,但我仍然不知道如何缩小这个范围 window 向下然后计数,其中第一行实际采用最后一次计数并向后工作。这是我目前所拥有的:
w = Window.partitionBy("equipment", "run").orderBy(asc("runend"))
df = df.withColumn("rank", rank().over(w))
# Just to see what the df looks like
df.where(col("equipment") == "A").groupby("equipment", "run", "rank", "failure", "runend", "removal_date").count().orderBy("equipment", "runend").show()
所以我得到了一个看起来像那样的 table,我认为我在正确的轨道上,但仍然缺少一些部分
| equipment | run | runend | failure | removal_date | rank|
| A | 1 | 1/1/2021 | 0 | 4/1/2021 | 1 |
| A | 2 | 2/1/2021 | 0 | 4/1/2021 | 2 |
| A | 3 | 3/1/2021 | 0 | 4/1/2021 | 3 |
| A | 4 | 4/1/2021 | 1 | 4/1/2021 | 4 |
| A | 5 | 4/1/2021 | 0 | 20/1/2021 | 5 |
| A | 6 | 10/1/2021 | 0 | 20/1/2021 | 6 |
这可以使用简单的 datediff 函数完成,无需使用 window 概念。以下是代码。
>>> from pyspark.sql import functions as f
>>> df1 = spark.createDataFrame([
("A",1,"1/1/2021",0,"4/1/2021"),
("A",2,"2/1/2021",0,"4/1/2021"),
("A",3,"3/1/2021",0,"4/1/2021"),
("A",4,"4/1/2021",1,"4/1/2021"),
("A",5,"4/1/2021",0,"20/1/2021"),
("A",6,"10/1/2021",0,"20/1/2021")
], schema=["equipment","run","runend","failure","removal_date"]
)
>>> df1.show()
+---------+---+---------+-------+------------+
|equipment|run| runend|failure|removal_date|
+---------+---+---------+-------+------------+
| A| 1| 1/1/2021| 0| 4/1/2021|
| A| 2| 2/1/2021| 0| 4/1/2021|
| A| 3| 3/1/2021| 0| 4/1/2021|
| A| 4| 4/1/2021| 1| 4/1/2021|
| A| 5| 4/1/2021| 0| 20/1/2021|
| A| 6|10/1/2021| 0| 20/1/2021|
+---------+---+---------+-------+------------+
>>> df2 = df1.withColumn("runend", f.to_date(f.col("runend"), "d/M/yyyy")) \
.withColumn("removal_date", f.to_date(f.col("removel_date"), "d/M/yyyy"))
>>> df2.show()
+---------+---+----------+-------+------------+
|equipment|run| runend|failure|removal_date|
+---------+---+----------+-------+------------+
| A| 1|2021-01-01| 0| 2021-01-04|
| A| 2|2021-01-02| 0| 2021-01-04|
| A| 3|2021-01-03| 0| 2021-01-04|
| A| 4|2021-01-04| 1| 2021-01-04|
| A| 5|2021-01-04| 0| 2021-01-20|
| A| 6|2021-01-10| 0| 2021-01-20|
+---------+---+----------+-------+------------+
>>> df3 = df2.withColumn("RUL", f.datediff(f.col("removal_date"), f.col("runend")))
>>> df3.show()
+---------+---+----------+-------+------------+---+
|equipment|run| runend|failure|removal_date|RUL|
+---------+---+----------+-------+------------+---+
| A| 1|2021-01-01| 0| 2021-01-04| 3|
| A| 2|2021-01-02| 0| 2021-01-04| 2|
| A| 3|2021-01-03| 0| 2021-01-04| 1|
| A| 4|2021-01-04| 1| 2021-01-04| 0|
| A| 5|2021-01-04| 0| 2021-01-20| 16|
| A| 6|2021-01-10| 0| 2021-01-20| 10|
+---------+---+----------+-------+------------+---+
我在 table
中有以下信息| equipment | run | runend | failure | removal_date |
| A | 1 | 1/1/2021 | 0 | 4/1/2021 |
| A | 2 | 2/1/2021 | 0 | 4/1/2021 |
| A | 3 | 3/1/2021 | 0 | 4/1/2021 |
| A | 4 | 4/1/2021 | 1 | 4/1/2021 |
| A | 5 | 4/1/2021 | 0 | 20/1/2021 |
| A | 6 | 10/1/2021 | 0 | 20/1/2021 |
我想创建一个额外的列来倒计时到失败点,所以看起来像这样:
| equipment | run | runend | failure | removal_date | RUL |
| A | 1 | 1/1/2021 | 0 | 4/1/2021 | 3 |
| A | 2 | 2/1/2021 | 0 | 4/1/2021 | 2 |
| A | 3 | 3/1/2021 | 0 | 4/1/2021 | 1 |
| A | 4 | 4/1/2021 | 1 | 4/1/2021 | 0 |
| A | 5 | 4/1/2021 | 0 | 20/1/2021 | 16 |
| A | 6 | 10/1/2021 | 0 | 20/1/2021 | 10 |
所以基本上是一个计数,其中每一行都被计算到最接近 table 中显示的 removal_date 的运行端。
我认为这可以使用 window 函数来实现,并且我设法添加了一个列来计算设备的所有行,但我仍然不知道如何缩小这个范围 window 向下然后计数,其中第一行实际采用最后一次计数并向后工作。这是我目前所拥有的:
w = Window.partitionBy("equipment", "run").orderBy(asc("runend"))
df = df.withColumn("rank", rank().over(w))
# Just to see what the df looks like
df.where(col("equipment") == "A").groupby("equipment", "run", "rank", "failure", "runend", "removal_date").count().orderBy("equipment", "runend").show()
所以我得到了一个看起来像那样的 table,我认为我在正确的轨道上,但仍然缺少一些部分
| equipment | run | runend | failure | removal_date | rank|
| A | 1 | 1/1/2021 | 0 | 4/1/2021 | 1 |
| A | 2 | 2/1/2021 | 0 | 4/1/2021 | 2 |
| A | 3 | 3/1/2021 | 0 | 4/1/2021 | 3 |
| A | 4 | 4/1/2021 | 1 | 4/1/2021 | 4 |
| A | 5 | 4/1/2021 | 0 | 20/1/2021 | 5 |
| A | 6 | 10/1/2021 | 0 | 20/1/2021 | 6 |
这可以使用简单的 datediff 函数完成,无需使用 window 概念。以下是代码。
>>> from pyspark.sql import functions as f
>>> df1 = spark.createDataFrame([
("A",1,"1/1/2021",0,"4/1/2021"),
("A",2,"2/1/2021",0,"4/1/2021"),
("A",3,"3/1/2021",0,"4/1/2021"),
("A",4,"4/1/2021",1,"4/1/2021"),
("A",5,"4/1/2021",0,"20/1/2021"),
("A",6,"10/1/2021",0,"20/1/2021")
], schema=["equipment","run","runend","failure","removal_date"]
)
>>> df1.show()
+---------+---+---------+-------+------------+
|equipment|run| runend|failure|removal_date|
+---------+---+---------+-------+------------+
| A| 1| 1/1/2021| 0| 4/1/2021|
| A| 2| 2/1/2021| 0| 4/1/2021|
| A| 3| 3/1/2021| 0| 4/1/2021|
| A| 4| 4/1/2021| 1| 4/1/2021|
| A| 5| 4/1/2021| 0| 20/1/2021|
| A| 6|10/1/2021| 0| 20/1/2021|
+---------+---+---------+-------+------------+
>>> df2 = df1.withColumn("runend", f.to_date(f.col("runend"), "d/M/yyyy")) \
.withColumn("removal_date", f.to_date(f.col("removel_date"), "d/M/yyyy"))
>>> df2.show()
+---------+---+----------+-------+------------+
|equipment|run| runend|failure|removal_date|
+---------+---+----------+-------+------------+
| A| 1|2021-01-01| 0| 2021-01-04|
| A| 2|2021-01-02| 0| 2021-01-04|
| A| 3|2021-01-03| 0| 2021-01-04|
| A| 4|2021-01-04| 1| 2021-01-04|
| A| 5|2021-01-04| 0| 2021-01-20|
| A| 6|2021-01-10| 0| 2021-01-20|
+---------+---+----------+-------+------------+
>>> df3 = df2.withColumn("RUL", f.datediff(f.col("removal_date"), f.col("runend")))
>>> df3.show()
+---------+---+----------+-------+------------+---+
|equipment|run| runend|failure|removal_date|RUL|
+---------+---+----------+-------+------------+---+
| A| 1|2021-01-01| 0| 2021-01-04| 3|
| A| 2|2021-01-02| 0| 2021-01-04| 2|
| A| 3|2021-01-03| 0| 2021-01-04| 1|
| A| 4|2021-01-04| 1| 2021-01-04| 0|
| A| 5|2021-01-04| 0| 2021-01-20| 16|
| A| 6|2021-01-10| 0| 2021-01-20| 10|
+---------+---+----------+-------+------------+---+