PySpark - window 函数产生新列

PySpark - window function results in new column

我有以下 PySpark 数据:

+---------+---------+---------+-------------------+
|event_id |user_id  |   status|         created_at|
+---------+---------+---------+-------------------+
|        1|        2|        a|2017-05-26 15:12:54|
|        1|        2|        b|2017-05-26 15:12:53|
|        2|        1|        a|2017-05-26 15:12:56|
|        1|        2|        b|2017-05-26 16:12:57|
|        2|        1|        c|2017-05-26 16:12:58|
|        2|        1|        b|2017-05-26 16:12:58|
|        3|        1|        b|2017-05-26 14:17:58|
+---------+---------+---------+-------------------+

对于每一对 (event_id, user_id)(这是主键,数据是从数据库中提取的)我想为每个 status 创建基于最高 created_at 的新列,其中 null 没有数据的对的值。对于以上数据:

+---------+---------+-------------------+-------------------+-------------------+
|event_id |user_id  |                  a|                  b|                  c|
+---------+---------+-------------------+-------------------+-------------------+
|        1|        2|2017-05-26 15:12:54|2017-05-26 16:12:57|               null|
|        2|        1|2017-05-26 15:12:56|               null|2017-05-26 16:12:58|
|        3|        1|               null|2017-05-26 14:17:58|               null|
+---------+---------+-------------------+-------------------+-------------------+

我的解决方案非常复杂、缓慢,我很确定它可以优化:

for status in ["a", "b", "c"]:
    df2 = df.filter(F.col("status") == status).groupBy(["event_id", "user_id"]).agg(F.max("created_at").alias(status))
    df = (
        df
        .join(
            df2, 
            on=(
                (df["event_id"] == df2["event_id"]) & 
                (df["user_id"] == df2["user_id"]) & 
                (df["status"] == status)
            ),
            how="left_outer"
        )
        .select(df["*"], status)
    )

df2 = (
    df
    .drop("status", "created_at")
    .groupBy(["event_id", "user_id"])
    .agg(F.max("a").alias("a"), F.max("b").alias("b"), F.max("c").alias("c"))
)

# df2 has the result

我能否在此处避免循环中的 JOIN,或者至少将 JOIN + groupBy 和 max 减少到一步?就像现在一样,我只是按顺序处理状态,这根本无法扩展。

试试这个,

df.groupBy("event_id","user_id").pivot("status").agg(first("created_at")).show