PySpark - window 函数产生新列
PySpark - window function results in new column
我有以下 PySpark 数据:
+---------+---------+---------+-------------------+
|event_id |user_id | status| created_at|
+---------+---------+---------+-------------------+
| 1| 2| a|2017-05-26 15:12:54|
| 1| 2| b|2017-05-26 15:12:53|
| 2| 1| a|2017-05-26 15:12:56|
| 1| 2| b|2017-05-26 16:12:57|
| 2| 1| c|2017-05-26 16:12:58|
| 2| 1| b|2017-05-26 16:12:58|
| 3| 1| b|2017-05-26 14:17:58|
+---------+---------+---------+-------------------+
对于每一对 (event_id, user_id)
(这是主键,数据是从数据库中提取的)我想为每个 status
创建基于最高 created_at
的新列,其中 null
没有数据的对的值。对于以上数据:
+---------+---------+-------------------+-------------------+-------------------+
|event_id |user_id | a| b| c|
+---------+---------+-------------------+-------------------+-------------------+
| 1| 2|2017-05-26 15:12:54|2017-05-26 16:12:57| null|
| 2| 1|2017-05-26 15:12:56| null|2017-05-26 16:12:58|
| 3| 1| null|2017-05-26 14:17:58| null|
+---------+---------+-------------------+-------------------+-------------------+
我的解决方案非常复杂、缓慢,我很确定它可以优化:
for status in ["a", "b", "c"]:
df2 = df.filter(F.col("status") == status).groupBy(["event_id", "user_id"]).agg(F.max("created_at").alias(status))
df = (
df
.join(
df2,
on=(
(df["event_id"] == df2["event_id"]) &
(df["user_id"] == df2["user_id"]) &
(df["status"] == status)
),
how="left_outer"
)
.select(df["*"], status)
)
df2 = (
df
.drop("status", "created_at")
.groupBy(["event_id", "user_id"])
.agg(F.max("a").alias("a"), F.max("b").alias("b"), F.max("c").alias("c"))
)
# df2 has the result
我能否在此处避免循环中的 JOIN,或者至少将 JOIN + groupBy 和 max 减少到一步?就像现在一样,我只是按顺序处理状态,这根本无法扩展。
试试这个,
df.groupBy("event_id","user_id").pivot("status").agg(first("created_at")).show
我有以下 PySpark 数据:
+---------+---------+---------+-------------------+
|event_id |user_id | status| created_at|
+---------+---------+---------+-------------------+
| 1| 2| a|2017-05-26 15:12:54|
| 1| 2| b|2017-05-26 15:12:53|
| 2| 1| a|2017-05-26 15:12:56|
| 1| 2| b|2017-05-26 16:12:57|
| 2| 1| c|2017-05-26 16:12:58|
| 2| 1| b|2017-05-26 16:12:58|
| 3| 1| b|2017-05-26 14:17:58|
+---------+---------+---------+-------------------+
对于每一对 (event_id, user_id)
(这是主键,数据是从数据库中提取的)我想为每个 status
创建基于最高 created_at
的新列,其中 null
没有数据的对的值。对于以上数据:
+---------+---------+-------------------+-------------------+-------------------+
|event_id |user_id | a| b| c|
+---------+---------+-------------------+-------------------+-------------------+
| 1| 2|2017-05-26 15:12:54|2017-05-26 16:12:57| null|
| 2| 1|2017-05-26 15:12:56| null|2017-05-26 16:12:58|
| 3| 1| null|2017-05-26 14:17:58| null|
+---------+---------+-------------------+-------------------+-------------------+
我的解决方案非常复杂、缓慢,我很确定它可以优化:
for status in ["a", "b", "c"]:
df2 = df.filter(F.col("status") == status).groupBy(["event_id", "user_id"]).agg(F.max("created_at").alias(status))
df = (
df
.join(
df2,
on=(
(df["event_id"] == df2["event_id"]) &
(df["user_id"] == df2["user_id"]) &
(df["status"] == status)
),
how="left_outer"
)
.select(df["*"], status)
)
df2 = (
df
.drop("status", "created_at")
.groupBy(["event_id", "user_id"])
.agg(F.max("a").alias("a"), F.max("b").alias("b"), F.max("c").alias("c"))
)
# df2 has the result
我能否在此处避免循环中的 JOIN,或者至少将 JOIN + groupBy 和 max 减少到一步?就像现在一样,我只是按顺序处理状态,这根本无法扩展。
试试这个,
df.groupBy("event_id","user_id").pivot("status").agg(first("created_at")).show