如何使用 Pyspark 标记 window 的最后一行
How to flag last rows from window using Pyspark
我的目标是创建一个新列is_end(什么时候是最后一个,前一个p_uuid是Null()然后is_end=1否则=0。我不知道如何组合 When() 和 last() 函数。
我尝试了几次与 windows 结合,但总是出错 :(
df = spark.createDataFrame([
(1, 110, None, '2019-09-28'),
(2, 110, None, '2019-09-28'),
(3, 110, 'aaa', '2019-09-28'),
(4, 110, None, '2019-09-17'),
(5, 110, None, '2019-09-17'),
(6, 110, 'bbb', '2019-09-17'),
(7, 110, None, '2019-09-01'),
(8, 110, None, '2019-09-01'),
(9, 110, None, '2019-09-01'),
(10, 110, None, '2019-09-01'),
(11, 110, 'ccc', '2019-09-01'),
(12, 110, None, '2019-09-01'),
(13, 110, None, '2019-09-01'),
(14, 110, None, '2019-09-01')
],
['idx', 'u_uuid', 'p_uuid', 'timestamp']
)
df.show()
我的数据框:
+---+------+------+----------+
|idx|u_uuid|p_uuid| timestamp|
+---+------+------+----------+
| 1| 110| null|2019-09-28|
| 2| 110| null|2019-09-28|
| 3| 110| aaa|2019-09-28|
| 4| 110| null|2019-09-17|
| 5| 110| null|2019-09-17|
| 6| 110| bbb|2019-09-17|
| 7| 110| null|2019-09-01|
| 8| 110| null|2019-09-01|
| 9| 110| null|2019-09-01|
| 10| 110| null|2019-09-01|
| 11| 110| ccc|2019-09-01|
| 12| 110| null|2019-09-01|
| 13| 110| null|2019-09-01|
| 14| 110| null|2019-09-01|
+---+------+------+----------+
w = Window.partitionBy("u_uuid").orderBy(col("timestamp"))
df.withColumn("p_uuid", when( lag(F.col("p_uuid").isNull()).over(w), 1).otherwise(0))
我在找什么:
+---+------+------+----------+------+
|idx|u_uuid|p_uuid| timestamp|is_end|
+---+------+------+----------+------+
| 1| 110| null|2019-09-28| 0|
| 2| 110| null|2019-09-28| 0|
| 3| 110| aaa|2019-09-28| 0|
| 4| 110| null|2019-09-17| 0|
| 5| 110| null|2019-09-17| 0|
| 6| 110| bbb|2019-09-17| 0|
| 7| 110| null|2019-09-01| 0|
| 8| 110| null|2019-09-01| 0|
| 9| 110| null|2019-09-01| 0|
| 10| 110| null|2019-09-01| 0|
| 11| 110| ccc|2019-09-01| 0|
| 12| 110| null|2019-08-29| 1|
| 13| 110| null|2019-08-29| 1|
| 14| 110| null|2019-08-29| 1|
波纹管是 pyspark sql 关联到您的案例:
w = (Window
.partitionBy("u_uuid")
.orderBy("timestamp"))
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df.withColumn("is_end", F.when(F.last("p_uuid", True).over(w).isNull() & F.col("p_uuid").isNull(), F.lit(1)).otherwise(F.lit(0)))\
.show()
我的目标是创建一个新列is_end(什么时候是最后一个,前一个p_uuid是Null()然后is_end=1否则=0。我不知道如何组合 When() 和 last() 函数。
我尝试了几次与 windows 结合,但总是出错 :(
df = spark.createDataFrame([
(1, 110, None, '2019-09-28'),
(2, 110, None, '2019-09-28'),
(3, 110, 'aaa', '2019-09-28'),
(4, 110, None, '2019-09-17'),
(5, 110, None, '2019-09-17'),
(6, 110, 'bbb', '2019-09-17'),
(7, 110, None, '2019-09-01'),
(8, 110, None, '2019-09-01'),
(9, 110, None, '2019-09-01'),
(10, 110, None, '2019-09-01'),
(11, 110, 'ccc', '2019-09-01'),
(12, 110, None, '2019-09-01'),
(13, 110, None, '2019-09-01'),
(14, 110, None, '2019-09-01')
],
['idx', 'u_uuid', 'p_uuid', 'timestamp']
)
df.show()
我的数据框:
+---+------+------+----------+
|idx|u_uuid|p_uuid| timestamp|
+---+------+------+----------+
| 1| 110| null|2019-09-28|
| 2| 110| null|2019-09-28|
| 3| 110| aaa|2019-09-28|
| 4| 110| null|2019-09-17|
| 5| 110| null|2019-09-17|
| 6| 110| bbb|2019-09-17|
| 7| 110| null|2019-09-01|
| 8| 110| null|2019-09-01|
| 9| 110| null|2019-09-01|
| 10| 110| null|2019-09-01|
| 11| 110| ccc|2019-09-01|
| 12| 110| null|2019-09-01|
| 13| 110| null|2019-09-01|
| 14| 110| null|2019-09-01|
+---+------+------+----------+
w = Window.partitionBy("u_uuid").orderBy(col("timestamp"))
df.withColumn("p_uuid", when( lag(F.col("p_uuid").isNull()).over(w), 1).otherwise(0))
我在找什么:
+---+------+------+----------+------+
|idx|u_uuid|p_uuid| timestamp|is_end|
+---+------+------+----------+------+
| 1| 110| null|2019-09-28| 0|
| 2| 110| null|2019-09-28| 0|
| 3| 110| aaa|2019-09-28| 0|
| 4| 110| null|2019-09-17| 0|
| 5| 110| null|2019-09-17| 0|
| 6| 110| bbb|2019-09-17| 0|
| 7| 110| null|2019-09-01| 0|
| 8| 110| null|2019-09-01| 0|
| 9| 110| null|2019-09-01| 0|
| 10| 110| null|2019-09-01| 0|
| 11| 110| ccc|2019-09-01| 0|
| 12| 110| null|2019-08-29| 1|
| 13| 110| null|2019-08-29| 1|
| 14| 110| null|2019-08-29| 1|
波纹管是 pyspark sql 关联到您的案例:
w = (Window
.partitionBy("u_uuid")
.orderBy("timestamp"))
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df.withColumn("is_end", F.when(F.last("p_uuid", True).over(w).isNull() & F.col("p_uuid").isNull(), F.lit(1)).otherwise(F.lit(0)))\
.show()