如何使用 Pyspark 标记 window 的最后一行

How to flag last rows from window using Pyspark

我的目标是创建一个新列is_end(什么时候是最后一个,前一个p_uuid是Null()然后is_end=1否则=0。我不知道如何组合 When() 和 last() 函数。

我尝试了几次与 windows 结合,但总是出错 :(

df = spark.createDataFrame([
                        (1, 110, None, '2019-09-28'),
                        (2, 110, None, '2019-09-28'),
                        (3, 110, 'aaa', '2019-09-28'),
                        (4, 110, None, '2019-09-17'),
                        (5, 110, None, '2019-09-17'),
                        (6, 110, 'bbb', '2019-09-17'),
                        (7, 110, None, '2019-09-01'),
                        (8, 110, None, '2019-09-01'),
                        (9, 110, None, '2019-09-01'),
                        (10, 110, None, '2019-09-01'),
                        (11, 110, 'ccc', '2019-09-01'),
                        (12, 110, None, '2019-09-01'),
                        (13, 110, None, '2019-09-01'),
                        (14, 110, None, '2019-09-01')
                    ],
                    ['idx', 'u_uuid', 'p_uuid', 'timestamp']
                )
df.show()

我的数据框:

+---+------+------+----------+
|idx|u_uuid|p_uuid| timestamp|
+---+------+------+----------+
|  1|   110|  null|2019-09-28|
|  2|   110|  null|2019-09-28|
|  3|   110|   aaa|2019-09-28|
|  4|   110|  null|2019-09-17|
|  5|   110|  null|2019-09-17|
|  6|   110|   bbb|2019-09-17|
|  7|   110|  null|2019-09-01|
|  8|   110|  null|2019-09-01|
|  9|   110|  null|2019-09-01|
| 10|   110|  null|2019-09-01|
| 11|   110|   ccc|2019-09-01|
| 12|   110|  null|2019-09-01|
| 13|   110|  null|2019-09-01|
| 14|   110|  null|2019-09-01|
+---+------+------+----------+

w = Window.partitionBy("u_uuid").orderBy(col("timestamp"))
df.withColumn("p_uuid", when( lag(F.col("p_uuid").isNull()).over(w), 1).otherwise(0))

我在找什么:

+---+------+------+----------+------+
|idx|u_uuid|p_uuid| timestamp|is_end|
+---+------+------+----------+------+
|  1|   110|  null|2019-09-28|     0|
|  2|   110|  null|2019-09-28|     0|
|  3|   110|   aaa|2019-09-28|     0|
|  4|   110|  null|2019-09-17|     0|
|  5|   110|  null|2019-09-17|     0|
|  6|   110|   bbb|2019-09-17|     0|
|  7|   110|  null|2019-09-01|     0|
|  8|   110|  null|2019-09-01|     0|
|  9|   110|  null|2019-09-01|     0|
| 10|   110|  null|2019-09-01|     0|
| 11|   110|   ccc|2019-09-01|     0|
| 12|   110|  null|2019-08-29|     1|
| 13|   110|  null|2019-08-29|     1|
| 14|   110|  null|2019-08-29|     1|

波纹管是 pyspark sql 关联到您的案例:

w = (Window
    .partitionBy("u_uuid")
    .orderBy("timestamp"))
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

df.withColumn("is_end", F.when(F.last("p_uuid", True).over(w).isNull() & F.col("p_uuid").isNull(), F.lit(1)).otherwise(F.lit(0)))\
    .show()