将历史路径功能添加到 PySpark 数据框

Adding historical path feature to a PySpark dataframe

我的原始数据框中有 'Event' 列,我想添加其他 2 列。

Event Event_lag Hist_event
0 N N
0 0 N0
1 0 N00
0 1 N001
from pyspark.sql.functions import lag, col, monotonically_increasing_id, collect_list, concat_ws
from pyspark.sql import Window

#sample data
df= sc.parallelize([[0], [0], [1], [0]]).toDF(["Event"])

#add row index to the dataframe
df = df.withColumn("row_idx", monotonically_increasing_id())

w  = Window.orderBy("row_idx")

#add 'Event_Lag' column to the dataframe
df = df.withColumn("Event_Lag", lag(col('Event').cast('string')).over(w))
df = df.fillna({'Event_Lag':'N'})

#finally add 'Hist_Event' column to the dataframe and remove row index column (i.e. 'row_idx') to have the final result
df = df.withColumn("Hist_Event", collect_list(col('Event_Lag')).over(w)).\
        withColumn("Hist_Event", concat_ws("","Hist_Event")).\
        drop("row_idx")
df.show()

示例输入:

+-----+
|Event|
+-----+
|    0|
|    0|
|    1|
|    0|
+-----+

输出为:

+-----+---------+----------+
|Event|Event_Lag|Hist_Event|
+-----+---------+----------+
|    0|        N|         N|
|    0|        0|        N0|
|    1|        0|       N00|
|    0|        1|      N001|
+-----+---------+----------+