pyspark代码排名分区问题我哪里做错了?

pyspark code ranking partition question where i did wrong?

我有一个数据集 df 如下:

ID date        class
1  2020/01/02   [math,english]
1  2020/01/03   [math,english]
1  2020/01/04   [math,english]
2  2020/01/02   [math,english]
2  2020/01/03   [math,english,art]
2  2020/01/04   [math,english]
2  2020/01/05   [math,english,art]
2  2020/01/06   [math,art]
2  2020/01/07   [math,art] 
2  2020/01/08   [math,english,art]

我当前的代码是:

 df.withColumn("c_order", rank()\
.over(Window.partitionBy("ID","date")\
.orderBy("class")))\

我也尝试过 dense_rank() 和 row_number(),但是其中 none 可以提供所需的输出。

 df.withColumn("c_order", dense_rank()\
.over(Window.partitionBy("ID","date")\
.orderBy("class")))\

 df.withColumn("c_order", row_number()\
.over(Window.partitionBy("ID","date")\
.orderBy("class")))\

我当前的输出如下:

    ID   date        class                c_order
1  2020/01/02   [math,english]           1
1  2020/01/03   [math,english]           1
1  2020/01/04   [math,english]           1
2  2020/01/02   [math,english]           1
2  2020/01/03   [math,english,art]       1
2  2020/01/04   [math,english]           1
2  2020/01/05   [math,english,art]       1
2  2020/01/06   [math,art]               1
2  2020/01/07   [math,art]               1
2  2020/01/08   [math,english,art]       1

我想要如下输出

ID   date        class                c_order
1  2020/01/02   [math,english]           1
1  2020/01/03   [math,english]           1
1  2020/01/04   [math,english]           1
2  2020/01/02   [math,english]           1
2  2020/01/03   [math,english,art]       2
2  2020/01/04   [math,english]           3
2  2020/01/05   [math,english,art]       4
2  2020/01/06   [math,art]               5
2  2020/01/07   [math,art]               5
2  2020/01/08   [math,english,art]       6

订单仅在 class 更改时增加。 知道我哪里做错了吗?

谢谢!

你不能只做排名。您需要与上一行(使用 lag)进行比较,以检查 class 何时更改。

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'diff',
    F.coalesce(
        F.col('class') != F.lag('class').over(Window.partitionBy('ID').orderBy('date')), 
        F.lit(False)
    ).cast('int')
).withColumn(
    'c_order',
    F.sum('diff').over(Window.partitionBy('ID').orderBy('date')) + 1
)

df2.show()
+---+----------+------------------+----+-------+
| ID|      date|             class|diff|c_order|
+---+----------+------------------+----+-------+
|  1|2020/01/02|    [math,english]|   0|      1|
|  1|2020/01/03|    [math,english]|   0|      1|
|  1|2020/01/04|    [math,english]|   0|      1|
|  2|2020/01/02|    [math,english]|   0|      1|
|  2|2020/01/03|[math,english,art]|   1|      2|
|  2|2020/01/04|    [math,english]|   1|      3|
|  2|2020/01/05|[math,english,art]|   1|      4|
|  2|2020/01/06|        [math,art]|   1|      5|
|  2|2020/01/07|        [math,art]|   0|      5|
|  2|2020/01/08|[math,english,art]|   1|      6|
+---+----------+------------------+----+-------+