SQL 或 Pyspark - 获取列最后一次对每个 ID 具有不同值的时间
SQL or Pyspark - Get the last time a column had a different value for each ID
我正在使用 pyspark,所以我尝试了 pyspark 代码和 SQL。
我正在尝试获取 ADDRESS 列为不同值的时间,按 USER_ID 分组。行按 TIME 排序。采取以下 table:
+---+-------+-------+----+
| ID|USER_ID|ADDRESS|TIME|
+---+-------+-------+----+
| 1| 1| A| 10|
| 2| 1| B| 15|
| 3| 1| A| 20|
| 4| 1| A| 40|
| 5| 1| A| 45|
+---+-------+-------+----+
我想要的正确的新专栏如下:
+---+-------+-------+----+---------+
| ID|USER_ID|ADDRESS|TIME|LAST_DIFF|
+---+-------+-------+----+---------+
| 1| 1| A| 10| null|
| 2| 1| B| 15| 10|
| 3| 1| A| 20| 15|
| 4| 1| A| 40| 15|
| 5| 1| A| 45| 15|
+---+-------+-------+----+---------+
我曾尝试使用不同的 windows,但 none 似乎总能得到我想要的。有什么想法吗?
使用两个 Window 规范的一种方法:
from pyspark.sql.functions import when, col, lag, sum as fsum
from pyspark.sql import Window
w1 = Window.partitionBy('USER_ID').orderBy('ID')
w2 = Window.partitionBy('USER_ID').orderBy('g')
# create a new sub-group label based on the values of ADDRESS and Previous ADDRESS
df1 = df.withColumn('g', fsum(when(col('ADDRESS') == lag('ADDRESS').over(w1), 0).otherwise(1)).over(w1))
# group by USER_ID and the above sub-group label and calculate the sum of time in the group as diff
# calculate the last_diff and then join the data back to the df1
df2 = df1.groupby('USER_ID', 'g').agg(fsum('Time').alias('diff')).withColumn('last_diff', lag('diff').over(w2))
df1.join(df2, on=['USER_ID', 'g']).show()
+-------+---+---+-------+----+----+---------+
|USER_ID| g| ID|ADDRESS|TIME|diff|last_diff|
+-------+---+---+-------+----+----+---------+
| 1| 1| 1| A| 10| 10| null|
| 1| 2| 2| B| 15| 15| 10|
| 1| 3| 3| A| 20| 105| 15|
| 1| 3| 4| A| 40| 105| 15|
| 1| 3| 5| A| 45| 105| 15|
+-------+---+---+-------+----+----+---------+
df_new = df1.join(df2, on=['USER_ID', 'g']).drop('g', 'diff')
@jxc 答案的简化版本。
from pyspark.sql.functions import *
from pyspark.sql import Window
#Window definition
w = Window.partitionBy(col('user_id')).orderBy(col('id'))
#Getting the previous time and classifying rows into groups
grp_df = df.withColumn('grp',sum(when(lag(col('address')).over(w) == col('address'),0).otherwise(1)).over(w)) \
.withColumn('prev_time',lag(col('time')).over(w))
#Window definition with groups
w_grp = Window.partitionBy(col('user_id'),col('grp')).orderBy(col('id'))
grp_df.withColumn('last_addr_change_time',min(col('prev_time')).over(w_grp)).show()
- 使用
lag
和 运行 sum
在列值发生变化时分配组(基于定义的 window)。从上一行中获取时间,将在下一步中使用。
- 获取组后,使用 运行
min
imum 获取列值更改的最后时间戳。 (建议您查看中间结果以更好地理解转换)
我正在使用 pyspark,所以我尝试了 pyspark 代码和 SQL。
我正在尝试获取 ADDRESS 列为不同值的时间,按 USER_ID 分组。行按 TIME 排序。采取以下 table:
+---+-------+-------+----+
| ID|USER_ID|ADDRESS|TIME|
+---+-------+-------+----+
| 1| 1| A| 10|
| 2| 1| B| 15|
| 3| 1| A| 20|
| 4| 1| A| 40|
| 5| 1| A| 45|
+---+-------+-------+----+
我想要的正确的新专栏如下:
+---+-------+-------+----+---------+
| ID|USER_ID|ADDRESS|TIME|LAST_DIFF|
+---+-------+-------+----+---------+
| 1| 1| A| 10| null|
| 2| 1| B| 15| 10|
| 3| 1| A| 20| 15|
| 4| 1| A| 40| 15|
| 5| 1| A| 45| 15|
+---+-------+-------+----+---------+
我曾尝试使用不同的 windows,但 none 似乎总能得到我想要的。有什么想法吗?
使用两个 Window 规范的一种方法:
from pyspark.sql.functions import when, col, lag, sum as fsum
from pyspark.sql import Window
w1 = Window.partitionBy('USER_ID').orderBy('ID')
w2 = Window.partitionBy('USER_ID').orderBy('g')
# create a new sub-group label based on the values of ADDRESS and Previous ADDRESS
df1 = df.withColumn('g', fsum(when(col('ADDRESS') == lag('ADDRESS').over(w1), 0).otherwise(1)).over(w1))
# group by USER_ID and the above sub-group label and calculate the sum of time in the group as diff
# calculate the last_diff and then join the data back to the df1
df2 = df1.groupby('USER_ID', 'g').agg(fsum('Time').alias('diff')).withColumn('last_diff', lag('diff').over(w2))
df1.join(df2, on=['USER_ID', 'g']).show()
+-------+---+---+-------+----+----+---------+
|USER_ID| g| ID|ADDRESS|TIME|diff|last_diff|
+-------+---+---+-------+----+----+---------+
| 1| 1| 1| A| 10| 10| null|
| 1| 2| 2| B| 15| 15| 10|
| 1| 3| 3| A| 20| 105| 15|
| 1| 3| 4| A| 40| 105| 15|
| 1| 3| 5| A| 45| 105| 15|
+-------+---+---+-------+----+----+---------+
df_new = df1.join(df2, on=['USER_ID', 'g']).drop('g', 'diff')
@jxc 答案的简化版本。
from pyspark.sql.functions import *
from pyspark.sql import Window
#Window definition
w = Window.partitionBy(col('user_id')).orderBy(col('id'))
#Getting the previous time and classifying rows into groups
grp_df = df.withColumn('grp',sum(when(lag(col('address')).over(w) == col('address'),0).otherwise(1)).over(w)) \
.withColumn('prev_time',lag(col('time')).over(w))
#Window definition with groups
w_grp = Window.partitionBy(col('user_id'),col('grp')).orderBy(col('id'))
grp_df.withColumn('last_addr_change_time',min(col('prev_time')).over(w_grp)).show()
- 使用
lag
和 运行sum
在列值发生变化时分配组(基于定义的 window)。从上一行中获取时间,将在下一步中使用。 - 获取组后,使用 运行
min
imum 获取列值更改的最后时间戳。 (建议您查看中间结果以更好地理解转换)