在 pyspark 中创建查找列
Create a lookup column in pyspark
我正在尝试在 pyspark 数据框中创建一个新列,该列“查找”同一数据框中的下一个值,并将其复制到所有下一行,直到下一个事件发生。
我已经使用了如下的窗口函数,但仍然无法获得该列的下一个值:
condition = (col("col2") == 'event_start_ind')
w=Window.partitionBy("col2").orderBy(*[when(condition, lit(1)).desc()])
df.select(["timestamp",
"col1",
"col2",
"col3"
]).withColumn("col4", when(condition, lead("col3",1).over(w))) \
.orderBy("timestamp") \
.show(500, truncate=False)
显然它不会正确查找“下一个”事件。关于可能的方法有什么想法吗?
示例数据框为:
timestamp
col1
col2
col3
2021-02-02 01:03:55
s1
null
null
2021-02-02 01:04:16.952854
s1
other_ind
null
2021-02-02 01:04:32.398155
s1
null
null
2021-02-02 01:04:53.793089
s1
event_start_ind
event_1_value
2021-02-02 01:05:10.936913
s1
null
null
2021-02-02 01:05:36
s1
other_ind
null
2021-02-02 01:05:42
s1
null
null
2021-02-02 01:05:43
s1
null
null
2021-02-02 01:05:44
s1
event_start_ind
event_2_value
2021-02-02 01:05:46.623198
s1
null
null
2021-02-02 01:06:50
s1
null
null
2021-02-02 01:07:19.607685
s1
null
null
期望的结果是:
timestamp
col1
col2
col3
col4
2021-02-02 01:03:55
s1
null
null
event_1_value
2021-02-02 01:04:16.952854
s1
other_ind
null
event_1_value
2021-02-02 01:04:32.398155
s1
null
null
event_1_value
2021-02-02 01:04:53.793089
s1
event_start_ind
event_1_value
event_1_value
2021-02-02 01:05:10.936913
s1
null
null
event_2_value
2021-02-02 01:05:36
s1
other_ind
null
event_2_value
2021-02-02 01:05:42
s1
null
null
event_2_value
2021-02-02 01:05:43
s1
null
null
event_2_value
2021-02-02 01:05:44
s1
event_start_ind
event_2_value
event_2_value
2021-02-02 01:05:46.623198
s1
null
null
null
2021-02-02 01:06:50
s1
null
null
null
2021-02-02 01:07:19.607685
s1
null
null
null
您的 window 似乎没有分区,并且事件没有相同数量的记录。考虑到这一点,我想到的解决方案是使用每个事件开始的位置来检索相应的值。
考虑到按时间戳排序,我们提取每一行的位置:
from pyspark.sql import Window
from pyspark.sql.functions import col, rank, collect_list, expr
df = (
spark.createDataFrame(
[
{ 'timestamp': '2021-02-02 01:03:55', 'col1': 's1' },
{ 'timestamp': '2021-02-02 01:04:16.952854', 'col1': 's1', 'col2': 'other_ind'},
{ 'timestamp': '2021-02-02 01:04:32.398155', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:04:53.793089', 'col1': 's1', 'col2': 'event_start_ind', 'col3': 'event_1_value'},
{ 'timestamp': '2021-02-02 01:05:10.936913', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:05:36', 'col1': 's1', 'col2': 'other_ind'},
{ 'timestamp': '2021-02-02 01:05:42', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:05:43', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:05:44', 'col1': 's1', 'col2': 'event_start_ind', 'col3': 'event_2_value'},
{ 'timestamp': '2021-02-02 01:05:46.623198', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:06:50', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:07:19.607685', 'col1': 's1'}
]
)
.withColumn('timestamp', col('timestamp').cast('timestamp'))
.withColumn("line", rank().over(Window.orderBy("timestamp")))
)
df.show(truncate=False)
+----+--------------------------+---------------+-------------+----+
|col1|timestamp |col2 |col3 |line|
+----+--------------------------+---------------+-------------+----+
|s1 |2021-02-02 01:03:55 |null |null |1 |
|s1 |2021-02-02 01:04:16.952854|other_ind |null |2 |
|s1 |2021-02-02 01:04:32.398155|null |null |3 |
|s1 |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|4 |
|s1 |2021-02-02 01:05:10.936913|null |null |5 |
|s1 |2021-02-02 01:05:36 |other_ind |null |6 |
|s1 |2021-02-02 01:05:42 |null |null |7 |
|s1 |2021-02-02 01:05:43 |null |null |8 |
|s1 |2021-02-02 01:05:44 |event_start_ind|event_2_value|9 |
|s1 |2021-02-02 01:05:46.623198|null |null |10 |
|s1 |2021-02-02 01:06:50 |null |null |11 |
|s1 |2021-02-02 01:07:19.607685|null |null |12 |
+----+--------------------------+---------------+-------------+----+
之后我们确定每个事件的开始:
df_event_start = (
df.filter(col("col2") == 'event_start_ind')
.select(
col("line").alias("event_start_line"),
col("col3").alias("event_value")
)
)
df_event_start.show()
+----------------+-------------+
|event_start_line| event_value|
+----------------+-------------+
| 4|event_1_value|
| 9|event_2_value|
+----------------+-------------+
使用 event_start
信息查找下一个有效事件开始:
df_with_event_starts = (
df.join(
df_event_start.select(collect_list('event_start_line').alias("event_starts"))
)
.withColumn("next_valid_event", expr("element_at(filter(event_starts, x -> x >= line), 1)"))
)
df_with_event_starts.show(truncate=False)
+----+--------------------------+---------------+-------------+----+------------+----------------+
|col1|timestamp |col2 |col3 |line|event_starts|next_valid_event|
+----+--------------------------+---------------+-------------+----+------------+----------------+
|s1 |2021-02-02 01:03:55 |null |null |1 |[4, 9] |4 |
|s1 |2021-02-02 01:04:16.952854|other_ind |null |2 |[4, 9] |4 |
|s1 |2021-02-02 01:04:32.398155|null |null |3 |[4, 9] |4 |
|s1 |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|4 |[4, 9] |4 |
|s1 |2021-02-02 01:05:10.936913|null |null |5 |[4, 9] |9 |
|s1 |2021-02-02 01:05:36 |other_ind |null |6 |[4, 9] |9 |
|s1 |2021-02-02 01:05:42 |null |null |7 |[4, 9] |9 |
|s1 |2021-02-02 01:05:43 |null |null |8 |[4, 9] |9 |
|s1 |2021-02-02 01:05:44 |event_start_ind|event_2_value|9 |[4, 9] |9 |
|s1 |2021-02-02 01:05:46.623198|null |null |10 |[4, 9] |null |
|s1 |2021-02-02 01:06:50 |null |null |11 |[4, 9] |null |
|s1 |2021-02-02 01:07:19.607685|null |null |12 |[4, 9] |null |
+----+--------------------------+---------------+-------------+----+------------+----------------+
最后检索到正确的值:
(
df_with_event_starts.join(
df_event_start,
col("next_valid_event") == col("event_start_line"),
how="left"
)
.drop("line", "event_starts", "next_valid_event", "event_start_line")
.show(truncate=False)
)
+----+--------------------------+---------------+-------------+-------------+
|col1|timestamp |col2 |col3 |event_value |
+----+--------------------------+---------------+-------------+-------------+
|s1 |2021-02-02 01:03:55 |null |null |event_1_value|
|s1 |2021-02-02 01:04:16.952854|other_ind |null |event_1_value|
|s1 |2021-02-02 01:04:32.398155|null |null |event_1_value|
|s1 |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|event_1_value|
|s1 |2021-02-02 01:05:10.936913|null |null |event_2_value|
|s1 |2021-02-02 01:05:36 |other_ind |null |event_2_value|
|s1 |2021-02-02 01:05:42 |null |null |event_2_value|
|s1 |2021-02-02 01:05:43 |null |null |event_2_value|
|s1 |2021-02-02 01:05:44 |event_start_ind|event_2_value|event_2_value|
|s1 |2021-02-02 01:05:46.623198|null |null |null |
|s1 |2021-02-02 01:06:50 |null |null |null |
|s1 |2021-02-02 01:07:19.607685|null |null |null |
+----+--------------------------+---------------+-------------+-------------+
此解决方案会给您带来处理大批量的问题。
如果您能为每个事件找出一个密钥,我建议您使用 window 函数继续您的初始解决方案。如果发生这种情况,您可以测试 last
或 first
sql 函数(忽略空值)。
希望有人能帮助您找到更好的解决方案。
提示:在问题中提供数据框创建脚本很有帮助。
我正在尝试在 pyspark 数据框中创建一个新列,该列“查找”同一数据框中的下一个值,并将其复制到所有下一行,直到下一个事件发生。
我已经使用了如下的窗口函数,但仍然无法获得该列的下一个值:
condition = (col("col2") == 'event_start_ind')
w=Window.partitionBy("col2").orderBy(*[when(condition, lit(1)).desc()])
df.select(["timestamp",
"col1",
"col2",
"col3"
]).withColumn("col4", when(condition, lead("col3",1).over(w))) \
.orderBy("timestamp") \
.show(500, truncate=False)
显然它不会正确查找“下一个”事件。关于可能的方法有什么想法吗?
示例数据框为:
timestamp | col1 | col2 | col3 |
---|---|---|---|
2021-02-02 01:03:55 | s1 | null | null |
2021-02-02 01:04:16.952854 | s1 | other_ind | null |
2021-02-02 01:04:32.398155 | s1 | null | null |
2021-02-02 01:04:53.793089 | s1 | event_start_ind | event_1_value |
2021-02-02 01:05:10.936913 | s1 | null | null |
2021-02-02 01:05:36 | s1 | other_ind | null |
2021-02-02 01:05:42 | s1 | null | null |
2021-02-02 01:05:43 | s1 | null | null |
2021-02-02 01:05:44 | s1 | event_start_ind | event_2_value |
2021-02-02 01:05:46.623198 | s1 | null | null |
2021-02-02 01:06:50 | s1 | null | null |
2021-02-02 01:07:19.607685 | s1 | null | null |
期望的结果是:
timestamp | col1 | col2 | col3 | col4 |
---|---|---|---|---|
2021-02-02 01:03:55 | s1 | null | null | event_1_value |
2021-02-02 01:04:16.952854 | s1 | other_ind | null | event_1_value |
2021-02-02 01:04:32.398155 | s1 | null | null | event_1_value |
2021-02-02 01:04:53.793089 | s1 | event_start_ind | event_1_value | event_1_value |
2021-02-02 01:05:10.936913 | s1 | null | null | event_2_value |
2021-02-02 01:05:36 | s1 | other_ind | null | event_2_value |
2021-02-02 01:05:42 | s1 | null | null | event_2_value |
2021-02-02 01:05:43 | s1 | null | null | event_2_value |
2021-02-02 01:05:44 | s1 | event_start_ind | event_2_value | event_2_value |
2021-02-02 01:05:46.623198 | s1 | null | null | null |
2021-02-02 01:06:50 | s1 | null | null | null |
2021-02-02 01:07:19.607685 | s1 | null | null | null |
您的 window 似乎没有分区,并且事件没有相同数量的记录。考虑到这一点,我想到的解决方案是使用每个事件开始的位置来检索相应的值。
考虑到按时间戳排序,我们提取每一行的位置:
from pyspark.sql import Window
from pyspark.sql.functions import col, rank, collect_list, expr
df = (
spark.createDataFrame(
[
{ 'timestamp': '2021-02-02 01:03:55', 'col1': 's1' },
{ 'timestamp': '2021-02-02 01:04:16.952854', 'col1': 's1', 'col2': 'other_ind'},
{ 'timestamp': '2021-02-02 01:04:32.398155', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:04:53.793089', 'col1': 's1', 'col2': 'event_start_ind', 'col3': 'event_1_value'},
{ 'timestamp': '2021-02-02 01:05:10.936913', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:05:36', 'col1': 's1', 'col2': 'other_ind'},
{ 'timestamp': '2021-02-02 01:05:42', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:05:43', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:05:44', 'col1': 's1', 'col2': 'event_start_ind', 'col3': 'event_2_value'},
{ 'timestamp': '2021-02-02 01:05:46.623198', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:06:50', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:07:19.607685', 'col1': 's1'}
]
)
.withColumn('timestamp', col('timestamp').cast('timestamp'))
.withColumn("line", rank().over(Window.orderBy("timestamp")))
)
df.show(truncate=False)
+----+--------------------------+---------------+-------------+----+
|col1|timestamp |col2 |col3 |line|
+----+--------------------------+---------------+-------------+----+
|s1 |2021-02-02 01:03:55 |null |null |1 |
|s1 |2021-02-02 01:04:16.952854|other_ind |null |2 |
|s1 |2021-02-02 01:04:32.398155|null |null |3 |
|s1 |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|4 |
|s1 |2021-02-02 01:05:10.936913|null |null |5 |
|s1 |2021-02-02 01:05:36 |other_ind |null |6 |
|s1 |2021-02-02 01:05:42 |null |null |7 |
|s1 |2021-02-02 01:05:43 |null |null |8 |
|s1 |2021-02-02 01:05:44 |event_start_ind|event_2_value|9 |
|s1 |2021-02-02 01:05:46.623198|null |null |10 |
|s1 |2021-02-02 01:06:50 |null |null |11 |
|s1 |2021-02-02 01:07:19.607685|null |null |12 |
+----+--------------------------+---------------+-------------+----+
之后我们确定每个事件的开始:
df_event_start = (
df.filter(col("col2") == 'event_start_ind')
.select(
col("line").alias("event_start_line"),
col("col3").alias("event_value")
)
)
df_event_start.show()
+----------------+-------------+
|event_start_line| event_value|
+----------------+-------------+
| 4|event_1_value|
| 9|event_2_value|
+----------------+-------------+
使用 event_start
信息查找下一个有效事件开始:
df_with_event_starts = (
df.join(
df_event_start.select(collect_list('event_start_line').alias("event_starts"))
)
.withColumn("next_valid_event", expr("element_at(filter(event_starts, x -> x >= line), 1)"))
)
df_with_event_starts.show(truncate=False)
+----+--------------------------+---------------+-------------+----+------------+----------------+
|col1|timestamp |col2 |col3 |line|event_starts|next_valid_event|
+----+--------------------------+---------------+-------------+----+------------+----------------+
|s1 |2021-02-02 01:03:55 |null |null |1 |[4, 9] |4 |
|s1 |2021-02-02 01:04:16.952854|other_ind |null |2 |[4, 9] |4 |
|s1 |2021-02-02 01:04:32.398155|null |null |3 |[4, 9] |4 |
|s1 |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|4 |[4, 9] |4 |
|s1 |2021-02-02 01:05:10.936913|null |null |5 |[4, 9] |9 |
|s1 |2021-02-02 01:05:36 |other_ind |null |6 |[4, 9] |9 |
|s1 |2021-02-02 01:05:42 |null |null |7 |[4, 9] |9 |
|s1 |2021-02-02 01:05:43 |null |null |8 |[4, 9] |9 |
|s1 |2021-02-02 01:05:44 |event_start_ind|event_2_value|9 |[4, 9] |9 |
|s1 |2021-02-02 01:05:46.623198|null |null |10 |[4, 9] |null |
|s1 |2021-02-02 01:06:50 |null |null |11 |[4, 9] |null |
|s1 |2021-02-02 01:07:19.607685|null |null |12 |[4, 9] |null |
+----+--------------------------+---------------+-------------+----+------------+----------------+
最后检索到正确的值:
(
df_with_event_starts.join(
df_event_start,
col("next_valid_event") == col("event_start_line"),
how="left"
)
.drop("line", "event_starts", "next_valid_event", "event_start_line")
.show(truncate=False)
)
+----+--------------------------+---------------+-------------+-------------+
|col1|timestamp |col2 |col3 |event_value |
+----+--------------------------+---------------+-------------+-------------+
|s1 |2021-02-02 01:03:55 |null |null |event_1_value|
|s1 |2021-02-02 01:04:16.952854|other_ind |null |event_1_value|
|s1 |2021-02-02 01:04:32.398155|null |null |event_1_value|
|s1 |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|event_1_value|
|s1 |2021-02-02 01:05:10.936913|null |null |event_2_value|
|s1 |2021-02-02 01:05:36 |other_ind |null |event_2_value|
|s1 |2021-02-02 01:05:42 |null |null |event_2_value|
|s1 |2021-02-02 01:05:43 |null |null |event_2_value|
|s1 |2021-02-02 01:05:44 |event_start_ind|event_2_value|event_2_value|
|s1 |2021-02-02 01:05:46.623198|null |null |null |
|s1 |2021-02-02 01:06:50 |null |null |null |
|s1 |2021-02-02 01:07:19.607685|null |null |null |
+----+--------------------------+---------------+-------------+-------------+
此解决方案会给您带来处理大批量的问题。
如果您能为每个事件找出一个密钥,我建议您使用 window 函数继续您的初始解决方案。如果发生这种情况,您可以测试 last
或 first
sql 函数(忽略空值)。
希望有人能帮助您找到更好的解决方案。
提示:在问题中提供数据框创建脚本很有帮助。