Pyspark 有条件地增加列的索引
Pyspark conditionally increasing the index of a column
有没有一种好的方法来创建一个条件,使我可以为 session 列实现以下结果:
+------+-----------------------+-----------------------+------+-----------+
|userId|timestamp |timestamp_prev |diff |session |
+------+-----------------------+-----------------------+------+-----------+
|123456|2022-01-15 19:30:21.789|2022-01-15 19:29:48.18 |33 |Session 2|
|123456|2022-01-15 19:29:48.18 |2022-01-15 19:29:16.933|32 |Session 2|
|123456|2022-01-15 19:29:16.933|2022-01-15 19:29:08.062|8 |Session 2|
|123456|2022-01-15 19:29:08.062|2022-01-14 08:00:33.126|127715|Session 2|
|123456|2022-01-14 08:00:33.126|2022-01-14 08:00:30.807|3 |Session 1|
|123456|2022-01-14 08:00:30.807|2022-01-14 08:00:12.627|18 |Session 1|
|123456|2022-01-14 08:00:12.627|2022-01-14 08:00:09.703|3 |Session 1|
我现在有以下代码可以部分完成我想要的,但条件设置不正确。 有什么提示吗? 也应该按每个 userId 进行分区。
threshold = 1000 #sec
df_1 = df\
.withColumn('timestamp', to_timestamp('timestamp'))\
.withColumn('timestamp_prev', to_timestamp('timestamp_prev'))\
.withColumn("diff",
when(isnull(unix_timestamp('timestamp') - unix_timestamp('timestamp_prev')), 0)
.otherwise(unix_timestamp('timestamp') - unix_timestamp('timestamp_prev')))
df_2 = df_1\
.withColumn('session',
when(df.diff > threshold, 'New session')
.otherwise('Old session'))
+------+-----------------------+-----------------------+------+-----------+
|userId|timestamp |timestamp_prev |diff |session |
+------+-----------------------+-----------------------+------+-----------+
|123456|2022-01-15 19:30:21.789|2022-01-15 19:29:48.18 |33 |Old session|
|123456|2022-01-15 19:29:48.18 |2022-01-15 19:29:16.933|32 |Old session|
|123456|2022-01-15 19:29:16.933|2022-01-15 19:29:08.062|8 |Old session|
|123456|2022-01-15 19:29:08.062|2022-01-14 08:00:33.126|127715|New session|
|123456|2022-01-14 08:00:33.126|2022-01-14 08:00:30.807|3 |Old session|
|123456|2022-01-14 08:00:30.807|2022-01-14 08:00:12.627|18 |Old session|
|123456|2022-01-14 08:00:12.627|2022-01-14 08:00:09.703|3 |Old session|
我在之前的 post ()
中找到了我自己的问题的解决方案
df2 = df\
.withColumn('to_add_number',
when(df.diff > threshold, 1)
.otherwise(0))\
.withColumn("order_id",
monotonically_increasing_id())\
.withColumn("session",
sum("to_add_number").over(Window().partitionBy("userId").orderBy("order_id"))+1)\
.orderBy('order_id')\
.drop('to_add_number', 'order_id')
有没有一种好的方法来创建一个条件,使我可以为 session 列实现以下结果:
+------+-----------------------+-----------------------+------+-----------+
|userId|timestamp |timestamp_prev |diff |session |
+------+-----------------------+-----------------------+------+-----------+
|123456|2022-01-15 19:30:21.789|2022-01-15 19:29:48.18 |33 |Session 2|
|123456|2022-01-15 19:29:48.18 |2022-01-15 19:29:16.933|32 |Session 2|
|123456|2022-01-15 19:29:16.933|2022-01-15 19:29:08.062|8 |Session 2|
|123456|2022-01-15 19:29:08.062|2022-01-14 08:00:33.126|127715|Session 2|
|123456|2022-01-14 08:00:33.126|2022-01-14 08:00:30.807|3 |Session 1|
|123456|2022-01-14 08:00:30.807|2022-01-14 08:00:12.627|18 |Session 1|
|123456|2022-01-14 08:00:12.627|2022-01-14 08:00:09.703|3 |Session 1|
我现在有以下代码可以部分完成我想要的,但条件设置不正确。 有什么提示吗? 也应该按每个 userId 进行分区。
threshold = 1000 #sec
df_1 = df\
.withColumn('timestamp', to_timestamp('timestamp'))\
.withColumn('timestamp_prev', to_timestamp('timestamp_prev'))\
.withColumn("diff",
when(isnull(unix_timestamp('timestamp') - unix_timestamp('timestamp_prev')), 0)
.otherwise(unix_timestamp('timestamp') - unix_timestamp('timestamp_prev')))
df_2 = df_1\
.withColumn('session',
when(df.diff > threshold, 'New session')
.otherwise('Old session'))
+------+-----------------------+-----------------------+------+-----------+
|userId|timestamp |timestamp_prev |diff |session |
+------+-----------------------+-----------------------+------+-----------+
|123456|2022-01-15 19:30:21.789|2022-01-15 19:29:48.18 |33 |Old session|
|123456|2022-01-15 19:29:48.18 |2022-01-15 19:29:16.933|32 |Old session|
|123456|2022-01-15 19:29:16.933|2022-01-15 19:29:08.062|8 |Old session|
|123456|2022-01-15 19:29:08.062|2022-01-14 08:00:33.126|127715|New session|
|123456|2022-01-14 08:00:33.126|2022-01-14 08:00:30.807|3 |Old session|
|123456|2022-01-14 08:00:30.807|2022-01-14 08:00:12.627|18 |Old session|
|123456|2022-01-14 08:00:12.627|2022-01-14 08:00:09.703|3 |Old session|
我在之前的 post (
df2 = df\
.withColumn('to_add_number',
when(df.diff > threshold, 1)
.otherwise(0))\
.withColumn("order_id",
monotonically_increasing_id())\
.withColumn("session",
sum("to_add_number").over(Window().partitionBy("userId").orderBy("order_id"))+1)\
.orderBy('order_id')\
.drop('to_add_number', 'order_id')