在 pyspark 中创建查找列

Create a lookup column in pyspark

我正在尝试在 pyspark 数据框中创建一个新列,该列“查找”同一数据框中的下一个值,并将其复制到所有下一行,直到下一个事件发生。

我已经使用了如下的窗口函数,但仍然无法获得该列的下一个值:

condition = (col("col2") == 'event_start_ind')
w=Window.partitionBy("col2").orderBy(*[when(condition, lit(1)).desc()])

df.select(["timestamp",
           "col1",
           "col2",
           "col3"
          ]).withColumn("col4", when(condition, lead("col3",1).over(w))) \
.orderBy("timestamp") \
.show(500, truncate=False)

显然它不会正确查找“下一个”事件。关于可能的方法有什么想法吗?

示例数据框为:

timestamp col1 col2 col3
2021-02-02 01:03:55 s1 null null
2021-02-02 01:04:16.952854 s1 other_ind null
2021-02-02 01:04:32.398155 s1 null null
2021-02-02 01:04:53.793089 s1 event_start_ind event_1_value
2021-02-02 01:05:10.936913 s1 null null
2021-02-02 01:05:36 s1 other_ind null
2021-02-02 01:05:42 s1 null null
2021-02-02 01:05:43 s1 null null
2021-02-02 01:05:44 s1 event_start_ind event_2_value
2021-02-02 01:05:46.623198 s1 null null
2021-02-02 01:06:50 s1 null null
2021-02-02 01:07:19.607685 s1 null null

期望的结果是:

timestamp col1 col2 col3 col4
2021-02-02 01:03:55 s1 null null event_1_value
2021-02-02 01:04:16.952854 s1 other_ind null event_1_value
2021-02-02 01:04:32.398155 s1 null null event_1_value
2021-02-02 01:04:53.793089 s1 event_start_ind event_1_value event_1_value
2021-02-02 01:05:10.936913 s1 null null event_2_value
2021-02-02 01:05:36 s1 other_ind null event_2_value
2021-02-02 01:05:42 s1 null null event_2_value
2021-02-02 01:05:43 s1 null null event_2_value
2021-02-02 01:05:44 s1 event_start_ind event_2_value event_2_value
2021-02-02 01:05:46.623198 s1 null null null
2021-02-02 01:06:50 s1 null null null
2021-02-02 01:07:19.607685 s1 null null null

您的 window 似乎没有分区,并且事件没有相同数量的记录。考虑到这一点,我想到的解决方案是使用每个事件开始的位置来检索相应的值。

考虑到按时间戳排序,我们提取每一行的位置:

from pyspark.sql import Window
from pyspark.sql.functions import col, rank, collect_list, expr

df = (
  spark.createDataFrame(
    [
        { 'timestamp': '2021-02-02 01:03:55', 'col1': 's1' },
        { 'timestamp': '2021-02-02 01:04:16.952854', 'col1': 's1', 'col2': 'other_ind'},
        { 'timestamp': '2021-02-02 01:04:32.398155', 'col1': 's1'},
        { 'timestamp': '2021-02-02 01:04:53.793089', 'col1': 's1', 'col2': 'event_start_ind', 'col3': 'event_1_value'},
        { 'timestamp': '2021-02-02 01:05:10.936913', 'col1': 's1'},
        { 'timestamp': '2021-02-02 01:05:36', 'col1': 's1', 'col2': 'other_ind'},
        { 'timestamp': '2021-02-02 01:05:42', 'col1': 's1'},
        { 'timestamp': '2021-02-02 01:05:43', 'col1': 's1'},
        { 'timestamp': '2021-02-02 01:05:44', 'col1': 's1', 'col2': 'event_start_ind', 'col3': 'event_2_value'},
        { 'timestamp': '2021-02-02 01:05:46.623198', 'col1': 's1'},
        { 'timestamp': '2021-02-02 01:06:50', 'col1': 's1'},
        { 'timestamp': '2021-02-02 01:07:19.607685', 'col1': 's1'}
    ]
  )
  .withColumn('timestamp', col('timestamp').cast('timestamp'))
  .withColumn("line", rank().over(Window.orderBy("timestamp")))
)

df.show(truncate=False)
+----+--------------------------+---------------+-------------+----+
|col1|timestamp                 |col2           |col3         |line|
+----+--------------------------+---------------+-------------+----+
|s1  |2021-02-02 01:03:55       |null           |null         |1   |
|s1  |2021-02-02 01:04:16.952854|other_ind      |null         |2   |
|s1  |2021-02-02 01:04:32.398155|null           |null         |3   |
|s1  |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|4   |
|s1  |2021-02-02 01:05:10.936913|null           |null         |5   |
|s1  |2021-02-02 01:05:36       |other_ind      |null         |6   |
|s1  |2021-02-02 01:05:42       |null           |null         |7   |
|s1  |2021-02-02 01:05:43       |null           |null         |8   |
|s1  |2021-02-02 01:05:44       |event_start_ind|event_2_value|9   |
|s1  |2021-02-02 01:05:46.623198|null           |null         |10  |
|s1  |2021-02-02 01:06:50       |null           |null         |11  |
|s1  |2021-02-02 01:07:19.607685|null           |null         |12  |
+----+--------------------------+---------------+-------------+----+

之后我们确定每个事件的开始:

df_event_start = (
    df.filter(col("col2") == 'event_start_ind')
    .select(
        col("line").alias("event_start_line"),
        col("col3").alias("event_value")
    )
)
df_event_start.show()
+----------------+-------------+
|event_start_line|  event_value|
+----------------+-------------+
|               4|event_1_value|
|               9|event_2_value|
+----------------+-------------+

使用 event_start 信息查找下一个有效事件开始:

df_with_event_starts = (
    df.join(
        df_event_start.select(collect_list('event_start_line').alias("event_starts"))
    )
    .withColumn("next_valid_event", expr("element_at(filter(event_starts, x -> x >= line), 1)"))
)

df_with_event_starts.show(truncate=False)
+----+--------------------------+---------------+-------------+----+------------+----------------+
|col1|timestamp                 |col2           |col3         |line|event_starts|next_valid_event|
+----+--------------------------+---------------+-------------+----+------------+----------------+
|s1  |2021-02-02 01:03:55       |null           |null         |1   |[4, 9]      |4               |
|s1  |2021-02-02 01:04:16.952854|other_ind      |null         |2   |[4, 9]      |4               |
|s1  |2021-02-02 01:04:32.398155|null           |null         |3   |[4, 9]      |4               |
|s1  |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|4   |[4, 9]      |4               |
|s1  |2021-02-02 01:05:10.936913|null           |null         |5   |[4, 9]      |9               |
|s1  |2021-02-02 01:05:36       |other_ind      |null         |6   |[4, 9]      |9               |
|s1  |2021-02-02 01:05:42       |null           |null         |7   |[4, 9]      |9               |
|s1  |2021-02-02 01:05:43       |null           |null         |8   |[4, 9]      |9               |
|s1  |2021-02-02 01:05:44       |event_start_ind|event_2_value|9   |[4, 9]      |9               |
|s1  |2021-02-02 01:05:46.623198|null           |null         |10  |[4, 9]      |null            |
|s1  |2021-02-02 01:06:50       |null           |null         |11  |[4, 9]      |null            |
|s1  |2021-02-02 01:07:19.607685|null           |null         |12  |[4, 9]      |null            |
+----+--------------------------+---------------+-------------+----+------------+----------------+

最后检索到正确的值:

(
    df_with_event_starts.join(
        df_event_start,
        col("next_valid_event") == col("event_start_line"),
        how="left"
    )
    .drop("line", "event_starts", "next_valid_event", "event_start_line")
    .show(truncate=False)
)
+----+--------------------------+---------------+-------------+-------------+
|col1|timestamp                 |col2           |col3         |event_value  |
+----+--------------------------+---------------+-------------+-------------+
|s1  |2021-02-02 01:03:55       |null           |null         |event_1_value|
|s1  |2021-02-02 01:04:16.952854|other_ind      |null         |event_1_value|
|s1  |2021-02-02 01:04:32.398155|null           |null         |event_1_value|
|s1  |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|event_1_value|
|s1  |2021-02-02 01:05:10.936913|null           |null         |event_2_value|
|s1  |2021-02-02 01:05:36       |other_ind      |null         |event_2_value|
|s1  |2021-02-02 01:05:42       |null           |null         |event_2_value|
|s1  |2021-02-02 01:05:43       |null           |null         |event_2_value|
|s1  |2021-02-02 01:05:44       |event_start_ind|event_2_value|event_2_value|
|s1  |2021-02-02 01:05:46.623198|null           |null         |null         |
|s1  |2021-02-02 01:06:50       |null           |null         |null         |
|s1  |2021-02-02 01:07:19.607685|null           |null         |null         |
+----+--------------------------+---------------+-------------+-------------+

此解决方案会给您带来处理大批量的问题。 如果您能为每个事件找出一个密钥,我建议您使用 window 函数继续您的初始解决方案。如果发生这种情况,您可以测试 lastfirst sql 函数(忽略空值)。

希望有人能帮助您找到更好的解决方案。

提示:在问题中提供数据框创建脚本很有帮助。