Pyspark 根据 row_number 值增加时间戳列
Pyspark Increment the timestamp column based on row_number value
我正在从 eventhub 中提取数据,每个数据包中有 10 条记录,每个数据包上都有一个时间戳。
我想分解由 10 条记录组成的数据包,并且我想将数据包时间戳添加到每条记录中,当按 EnqueuedTimeUtc 和 vehicleid
分区时递增 1 秒
下面是我在数据框中的中间数据。
df.show()
+-------------------+---------------+-------------------+
| EnqueuedTimeUtc| vehicleid| datetime_pkt |
+-------------------+---------------+-------------------+
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
+-------------------+---------------+-------------------+
预期输出
+-------------------+---------------+-------------------+-------------------+
| EnqueuedTimeUtc| vehicleid| datetime_pkt | nw_datetime_pkt |
+-------------------+---------------+-------------------+-------------------+
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:20:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:21:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:22:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:23:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:24:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:25:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:26:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:27:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:28:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:20:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:21:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:22:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:23:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:24:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:25:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:26:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:27:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:28:43|
+-------------------+---------------+-------------------+-------------------+
我能够通过使用窗口函数解决上述任务。
步数:
- 为 partitionBy 列添加 row_number 并添加减去 1,以便 row_number 从 0 而不是 1 开始。
- 利用滞后函数并创建一个新列nw_datetime_pkt。
- 利用 unix_timestamp 函数,该函数采用时间戳列和秒数来递增
import pyspark.sql.functions as F
df1 = df.withColumn("rn", F.row_number().over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")) - 1) \
.withColumn("nw_datetime_pkt", F.lag(F.col("datetime_pkt")).over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")))
df1 = df1.withColumn("nw_datetime_pkt", F.when(F.col("nw_datetime_pkt").isNull(), F.col("datetime_pkt")).otherwise((F.unix_timestamp("nw_datetime_pkt") + df1.rn).cast('timestamp')))
我正在从 eventhub 中提取数据,每个数据包中有 10 条记录,每个数据包上都有一个时间戳。 我想分解由 10 条记录组成的数据包,并且我想将数据包时间戳添加到每条记录中,当按 EnqueuedTimeUtc 和 vehicleid
分区时递增 1 秒下面是我在数据框中的中间数据。
df.show()
+-------------------+---------------+-------------------+
| EnqueuedTimeUtc| vehicleid| datetime_pkt |
+-------------------+---------------+-------------------+
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
+-------------------+---------------+-------------------+
预期输出
+-------------------+---------------+-------------------+-------------------+
| EnqueuedTimeUtc| vehicleid| datetime_pkt | nw_datetime_pkt |
+-------------------+---------------+-------------------+-------------------+
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:20:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:21:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:22:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:23:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:24:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:25:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:26:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:27:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:28:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:20:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:21:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:22:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:23:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:24:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:25:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:26:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:27:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:28:43|
+-------------------+---------------+-------------------+-------------------+
我能够通过使用窗口函数解决上述任务。
步数:
- 为 partitionBy 列添加 row_number 并添加减去 1,以便 row_number 从 0 而不是 1 开始。
- 利用滞后函数并创建一个新列nw_datetime_pkt。
- 利用 unix_timestamp 函数,该函数采用时间戳列和秒数来递增
import pyspark.sql.functions as F
df1 = df.withColumn("rn", F.row_number().over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")) - 1) \
.withColumn("nw_datetime_pkt", F.lag(F.col("datetime_pkt")).over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")))
df1 = df1.withColumn("nw_datetime_pkt", F.when(F.col("nw_datetime_pkt").isNull(), F.col("datetime_pkt")).otherwise((F.unix_timestamp("nw_datetime_pkt") + df1.rn).cast('timestamp')))