Pyspark 根据 row_number 值增加时间戳列

Pyspark Increment the timestamp column based on row_number value

我正在从 eventhub 中提取数据,每个数据包中有 10 条记录,每个数据包上都有一个时间戳。 我想分解由 10 条记录组成的数据包,并且我想将数据包时间戳添加到每条记录中,当按 EnqueuedTimeUtc 和 vehicleid

分区时递增 1 秒

下面是我在数据框中的中间数据。


df.show()

+-------------------+---------------+-------------------+
|    EnqueuedTimeUtc|      vehicleid|   datetime_pkt    |
+-------------------+---------------+-------------------+
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
+-------------------+---------------+-------------------+

预期输出

+-------------------+---------------+-------------------+-------------------+
|    EnqueuedTimeUtc|      vehicleid|   datetime_pkt    | nw_datetime_pkt   |
+-------------------+---------------+-------------------+-------------------+
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:20:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:21:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:22:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:23:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:24:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:25:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:26:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:27:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:28:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:20:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:21:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:22:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:23:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:24:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:25:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:26:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:27:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:28:43|
+-------------------+---------------+-------------------+-------------------+

我能够通过使用窗口函数解决上述任务。

步数:

  1. 为 partitionBy 列添加 row_number 并添加减去 1,以便 row_number 从 0 而不是 1 开始。
  2. 利用滞后函数并创建一个新列nw_datetime_pkt。
  3. 利用 unix_timestamp 函数,该函数采用时间戳列和秒数来递增
import pyspark.sql.functions as F

df1 = df.withColumn("rn", F.row_number().over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")) - 1) \
        .withColumn("nw_datetime_pkt", F.lag(F.col("datetime_pkt")).over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")))

df1 = df1.withColumn("nw_datetime_pkt", F.when(F.col("nw_datetime_pkt").isNull(), F.col("datetime_pkt")).otherwise((F.unix_timestamp("nw_datetime_pkt") + df1.rn).cast('timestamp')))