Pyspark 有条件地增加列的索引

Pyspark conditionally increasing the index of a column

有没有一种好的方法来创建一个条件,使我可以为 session 列实现以下结果:

+------+-----------------------+-----------------------+------+-----------+
|userId|timestamp              |timestamp_prev         |diff  |session    |
+------+-----------------------+-----------------------+------+-----------+
|123456|2022-01-15 19:30:21.789|2022-01-15 19:29:48.18 |33    |Session 2|
|123456|2022-01-15 19:29:48.18 |2022-01-15 19:29:16.933|32    |Session 2|
|123456|2022-01-15 19:29:16.933|2022-01-15 19:29:08.062|8     |Session 2|
|123456|2022-01-15 19:29:08.062|2022-01-14 08:00:33.126|127715|Session 2|
|123456|2022-01-14 08:00:33.126|2022-01-14 08:00:30.807|3     |Session 1|
|123456|2022-01-14 08:00:30.807|2022-01-14 08:00:12.627|18    |Session 1|
|123456|2022-01-14 08:00:12.627|2022-01-14 08:00:09.703|3     |Session 1|

我现在有以下代码可以部分完成我想要的,但条件设置不正确。 有什么提示吗? 也应该按每个 userId 进行分区。

threshold = 1000 #sec

df_1 = df\
    .withColumn('timestamp', to_timestamp('timestamp'))\
    .withColumn('timestamp_prev', to_timestamp('timestamp_prev'))\
    .withColumn("diff", 
                when(isnull(unix_timestamp('timestamp') - unix_timestamp('timestamp_prev')), 0)
                .otherwise(unix_timestamp('timestamp') - unix_timestamp('timestamp_prev')))
df_2 = df_1\
        .withColumn('session',
                    when(df.diff > threshold, 'New session')
                    .otherwise('Old session'))

+------+-----------------------+-----------------------+------+-----------+
|userId|timestamp              |timestamp_prev         |diff  |session    |
+------+-----------------------+-----------------------+------+-----------+
|123456|2022-01-15 19:30:21.789|2022-01-15 19:29:48.18 |33    |Old session|
|123456|2022-01-15 19:29:48.18 |2022-01-15 19:29:16.933|32    |Old session|
|123456|2022-01-15 19:29:16.933|2022-01-15 19:29:08.062|8     |Old session|
|123456|2022-01-15 19:29:08.062|2022-01-14 08:00:33.126|127715|New session|
|123456|2022-01-14 08:00:33.126|2022-01-14 08:00:30.807|3     |Old session|
|123456|2022-01-14 08:00:30.807|2022-01-14 08:00:12.627|18    |Old session|
|123456|2022-01-14 08:00:12.627|2022-01-14 08:00:09.703|3     |Old session|

我在之前的 post ()

中找到了我自己的问题的解决方案
df2 = df\
    .withColumn('to_add_number',
                when(df.diff > threshold, 1)
                .otherwise(0))\
    .withColumn("order_id", 
                monotonically_increasing_id())\
    .withColumn("session", 
                sum("to_add_number").over(Window().partitionBy("userId").orderBy("order_id"))+1)\
    .orderBy('order_id')\
    .drop('to_add_number', 'order_id')