Pyspark-比较同一组内的行并根据比较制定新列
Pyspark- compare rows within the same group and formulate new columns based on the comparision
我遇到了一个棘手的情况,我正在尝试使用 pyspark 来解决同样的问题
输入数据帧中有两个唯一的 ID,class 要么是新的,要么是清晰的。
数据帧根据 Input_time.
升序排序
下面是创建结果数据框或 table 的逻辑。
对于一组唯一 ID
如果时间 1 的新行在时间 2 (time2 > time1) 已清除,则应将 uniqueid 设置为已清除,time_up 是新行的行,time_down 是'cleared' 行,和 class 'cleared' .
如果没有清除,并且有连续的新 class 相同 id ,则采用第 1 次出现,并增加新条目的 repeatCount。
如果任何已清除的都没有新的配对,则该行将被放弃。
输入:
+-----------+-------------+-------+----------+----------+-----------+----------+
|Unique_name| Input_time| class| time_up| time_down|repeatCount|updatetime|
+-----------+-------------+-------+----------+----------+-----------+----------+
| Unique-01| 2018-09-01| new|2018-09-01| null| 0| 0|
| Unique-01| 2018-09-15|cleared| null|2018-09-15| 0| 0|
| Unique-01| 2018-09-16| new|2018-09-16| null| 0| 0|
| Unique-01| 2018-09-27|cleared| null|2018-09-27| 0| 0|
| Unique-01| 2018-09-30|cleared| null|2018-09-30| 0| 0|
| Unique-02| 2018-09-21| new|2018-09-21| null| 0| 0|
| Unique-02| 2018-09-28|cleared| null|2018-09-28| 0| 0|
| Unique-02| 2018-09-28| new|2018-09-28| null| 0| 0|
| Unique-02| 2018-10-05| new|2018-10-05| null| 0| 0|
| Unique-02| 2018-10-15|cleared| null|2018-10-15| 0| 0|
| Unique-02| 2018-10-15| new|2018-10-15| null| 0| 0|
+-----------+-------------+-------+----------+----------+-----------+----------+
结果:
+-----------+-------------+-------+----------+----------+-----------+----------+
|Unique_name| Input_time| class| time_up| time_down|repeatCount|updatetime|
+-----------+-------------+-------+----------+----------+-----------+----------+
| Unique-01| 2018-09-01|cleared|2018-09-01|2018-09-15| 0| 0|
| Unique-01| 2018-09-16|cleared|2018-09-16|2018-09-27| 0| 0|
| Unique-02| 2018-09-21|cleared|2018-09-21|2018-09-28| 0| 0|
| Unique-02| 2018-10-28|cleared|2018-09-28|2018-10-15| 1|2018-10-05|
| Unique-02| 2018-10-15| new|2018-10-15| null| 0| 0|
+-----------+-------------+-------+----------+----------+-----------+----------+
在这个例子中,
独特-01
第一行是新的(2018-09-01),(下一行稍后出现)被清除,所以这形成了一对,第一行应该更新为第二行的 class 和 time_down
然后在 2018-09-16 又是一个新的,在 2018-09-27 被清除
下一行是 2018-09-30 但没有 'new' 是 cleared/paired。所以这个就放弃了。
唯一-02
第一行是新的(2018-09-21),(下一行稍后出现)被清除,所以这形成一对并且第一行应该用第二行的 class 和 time_down(2018-09-28)
下一行(Input_time at 2018-09-28)是新的,下一行没有明确但下一行又是新的(2018-10-05),所以 repeatCount 现在增加到 1 并且 updatetime 是重复行的时间..
下一行已清除,因此重复 1 的新行现在已清除。
终于在2018-10-15有了unique-02,没有新的row/clear,现在是最新的
任何人都可以提出有关如何使用 pyspark 实现此目的的想法。
我确实尝试将数据帧转换为字典列表并进行迭代,但如果有 1000 行,这是一种非常耗时且效率低下的方法。
获得预期输出的一种方法是:
- 识别并合并连续的
new
class 行并维护 repeatCount
和 updatetime
- 仅保留前
cleared
class 行连续 cleared
行
- 对于第 2 步中的数据框,找到下一行的 class,如果
cleared
更新相关列
from pyspark.sql import functions as F
from pyspark.sql.functions import col as c, lit as l
from pyspark.sql import Window as W
data = [("Unique-01", "2018-09-01", "new", "2018-09-01", None,),
("Unique-01", "2018-09-15", "cleared", None, "2018-09-15",),
("Unique-01", "2018-09-16", "new", "2018-09-16", None,),
("Unique-01", "2018-09-27", "cleared", None, "2018-09-27",),
("Unique-01", "2018-09-30", "cleared", None, "2018-09-30",),
("Unique-02", "2018-09-21", "new", "2018-09-21", None,),
("Unique-02", "2018-09-28", "cleared", None, "2018-09-28",),
("Unique-02", "2018-09-28", "new", "2018-09-28", None,),
("Unique-02", "2018-10-05", "new", "2018-10-05", None,),
("Unique-02", "2018-10-15", "cleared", None, "2018-10-15",),
("Unique-02", "2018-10-15", "new", "2018-10-15", None,), ]
df = spark.createDataFrame(data, ("Unique_name", "Input_time", "class", "time_up", "time_down",))
time_columns = ["Input_time", "time_up", "time_down", ]
df = df.select(*[F.to_date(c).alias(c) if c in time_columns else c for c in df.columns])
# STEP 1 and 2
ws = W.partitionBy("Unique_name").orderBy("Input_time")
runs_df = (df.withColumn("prev_class", F.lag("class", 1).over(ws))
.where(~((c("class") == c("prev_class")) & (c("class") == l("cleared"))))
.withColumn("run_indicator", F.when(c("class") == c("prev_class"), l(0)).otherwise(l(1)))
.withColumn("runs", F.sum("run_indicator").over(ws.rowsBetween(W.unboundedPreceding, W.currentRow)))
.drop("run_indicator")
)
new_class_run_ws = W.partitionBy("Unique_name", "runs").orderBy("Input_time").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
merged_new_runs = (runs_df.withColumn("run_start_input_time", F.first("Input_time").over(new_class_run_ws))
.withColumn("run_end_input_time", F.last("Input_time").over(new_class_run_ws))
.withColumn("repeat_count", F.count("Input_time").over(new_class_run_ws) - l(1))
.where("run_start_input_time == Input_time")
.drop("prev_class", "runs", "run_start_input_time", )
)
# STEP 3
(merged_new_runs.withColumn("next_class", F.lead("class", 1).over(ws))
.withColumn("next_time_down", F.lead("time_down", 1).over(ws))
.where("class == 'new'")
.withColumn("class", F.coalesce("next_class", "class"))
.drop("time_down")
.withColumnRenamed("next_time_down", "time_down")
.withColumn("updatetime", F.when(c("repeat_count") > 0, c("run_end_input_time")).otherwise(l(None)))
.select("Unique_name", "Input_time", "class", "time_up", "time_down", "repeat_count", "updatetime", )
).show()
输出
+-----------+----------+-------+----------+----------+------------+----------+
|Unique_name|Input_time| class| time_up| time_down|repeat_count|updatetime|
+-----------+----------+-------+----------+----------+------------+----------+
| Unique-01|2018-09-01|cleared|2018-09-01|2018-09-15| 0| null|
| Unique-01|2018-09-16|cleared|2018-09-16|2018-09-27| 0| null|
| Unique-02|2018-09-21|cleared|2018-09-21|2018-09-28| 0| null|
| Unique-02|2018-09-28|cleared|2018-09-28|2018-10-15| 1|2018-10-05|
| Unique-02|2018-10-15| new|2018-10-15| null| 0| null|
+-----------+----------+-------+----------+----------+------------+----------+
我遇到了一个棘手的情况,我正在尝试使用 pyspark 来解决同样的问题
输入数据帧中有两个唯一的 ID,class 要么是新的,要么是清晰的。 数据帧根据 Input_time.
升序排序下面是创建结果数据框或 table 的逻辑。
对于一组唯一 ID
如果时间 1 的新行在时间 2 (time2 > time1) 已清除,则应将 uniqueid 设置为已清除,time_up 是新行的行,time_down 是'cleared' 行,和 class 'cleared' .
如果没有清除,并且有连续的新 class 相同 id ,则采用第 1 次出现,并增加新条目的 repeatCount。
如果任何已清除的都没有新的配对,则该行将被放弃。
输入:
+-----------+-------------+-------+----------+----------+-----------+----------+
|Unique_name| Input_time| class| time_up| time_down|repeatCount|updatetime|
+-----------+-------------+-------+----------+----------+-----------+----------+
| Unique-01| 2018-09-01| new|2018-09-01| null| 0| 0|
| Unique-01| 2018-09-15|cleared| null|2018-09-15| 0| 0|
| Unique-01| 2018-09-16| new|2018-09-16| null| 0| 0|
| Unique-01| 2018-09-27|cleared| null|2018-09-27| 0| 0|
| Unique-01| 2018-09-30|cleared| null|2018-09-30| 0| 0|
| Unique-02| 2018-09-21| new|2018-09-21| null| 0| 0|
| Unique-02| 2018-09-28|cleared| null|2018-09-28| 0| 0|
| Unique-02| 2018-09-28| new|2018-09-28| null| 0| 0|
| Unique-02| 2018-10-05| new|2018-10-05| null| 0| 0|
| Unique-02| 2018-10-15|cleared| null|2018-10-15| 0| 0|
| Unique-02| 2018-10-15| new|2018-10-15| null| 0| 0|
+-----------+-------------+-------+----------+----------+-----------+----------+
结果:
+-----------+-------------+-------+----------+----------+-----------+----------+
|Unique_name| Input_time| class| time_up| time_down|repeatCount|updatetime|
+-----------+-------------+-------+----------+----------+-----------+----------+
| Unique-01| 2018-09-01|cleared|2018-09-01|2018-09-15| 0| 0|
| Unique-01| 2018-09-16|cleared|2018-09-16|2018-09-27| 0| 0|
| Unique-02| 2018-09-21|cleared|2018-09-21|2018-09-28| 0| 0|
| Unique-02| 2018-10-28|cleared|2018-09-28|2018-10-15| 1|2018-10-05|
| Unique-02| 2018-10-15| new|2018-10-15| null| 0| 0|
+-----------+-------------+-------+----------+----------+-----------+----------+
在这个例子中, 独特-01 第一行是新的(2018-09-01),(下一行稍后出现)被清除,所以这形成了一对,第一行应该更新为第二行的 class 和 time_down
然后在 2018-09-16 又是一个新的,在 2018-09-27 被清除 下一行是 2018-09-30 但没有 'new' 是 cleared/paired。所以这个就放弃了。
唯一-02 第一行是新的(2018-09-21),(下一行稍后出现)被清除,所以这形成一对并且第一行应该用第二行的 class 和 time_down(2018-09-28)
下一行(Input_time at 2018-09-28)是新的,下一行没有明确但下一行又是新的(2018-10-05),所以 repeatCount 现在增加到 1 并且 updatetime 是重复行的时间..
下一行已清除,因此重复 1 的新行现在已清除。
终于在2018-10-15有了unique-02,没有新的row/clear,现在是最新的
任何人都可以提出有关如何使用 pyspark 实现此目的的想法。
我确实尝试将数据帧转换为字典列表并进行迭代,但如果有 1000 行,这是一种非常耗时且效率低下的方法。
获得预期输出的一种方法是:
- 识别并合并连续的
new
class 行并维护repeatCount
和updatetime
- 仅保留前
cleared
class 行连续cleared
行 - 对于第 2 步中的数据框,找到下一行的 class,如果
cleared
更新相关列
from pyspark.sql import functions as F
from pyspark.sql.functions import col as c, lit as l
from pyspark.sql import Window as W
data = [("Unique-01", "2018-09-01", "new", "2018-09-01", None,),
("Unique-01", "2018-09-15", "cleared", None, "2018-09-15",),
("Unique-01", "2018-09-16", "new", "2018-09-16", None,),
("Unique-01", "2018-09-27", "cleared", None, "2018-09-27",),
("Unique-01", "2018-09-30", "cleared", None, "2018-09-30",),
("Unique-02", "2018-09-21", "new", "2018-09-21", None,),
("Unique-02", "2018-09-28", "cleared", None, "2018-09-28",),
("Unique-02", "2018-09-28", "new", "2018-09-28", None,),
("Unique-02", "2018-10-05", "new", "2018-10-05", None,),
("Unique-02", "2018-10-15", "cleared", None, "2018-10-15",),
("Unique-02", "2018-10-15", "new", "2018-10-15", None,), ]
df = spark.createDataFrame(data, ("Unique_name", "Input_time", "class", "time_up", "time_down",))
time_columns = ["Input_time", "time_up", "time_down", ]
df = df.select(*[F.to_date(c).alias(c) if c in time_columns else c for c in df.columns])
# STEP 1 and 2
ws = W.partitionBy("Unique_name").orderBy("Input_time")
runs_df = (df.withColumn("prev_class", F.lag("class", 1).over(ws))
.where(~((c("class") == c("prev_class")) & (c("class") == l("cleared"))))
.withColumn("run_indicator", F.when(c("class") == c("prev_class"), l(0)).otherwise(l(1)))
.withColumn("runs", F.sum("run_indicator").over(ws.rowsBetween(W.unboundedPreceding, W.currentRow)))
.drop("run_indicator")
)
new_class_run_ws = W.partitionBy("Unique_name", "runs").orderBy("Input_time").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
merged_new_runs = (runs_df.withColumn("run_start_input_time", F.first("Input_time").over(new_class_run_ws))
.withColumn("run_end_input_time", F.last("Input_time").over(new_class_run_ws))
.withColumn("repeat_count", F.count("Input_time").over(new_class_run_ws) - l(1))
.where("run_start_input_time == Input_time")
.drop("prev_class", "runs", "run_start_input_time", )
)
# STEP 3
(merged_new_runs.withColumn("next_class", F.lead("class", 1).over(ws))
.withColumn("next_time_down", F.lead("time_down", 1).over(ws))
.where("class == 'new'")
.withColumn("class", F.coalesce("next_class", "class"))
.drop("time_down")
.withColumnRenamed("next_time_down", "time_down")
.withColumn("updatetime", F.when(c("repeat_count") > 0, c("run_end_input_time")).otherwise(l(None)))
.select("Unique_name", "Input_time", "class", "time_up", "time_down", "repeat_count", "updatetime", )
).show()
输出
+-----------+----------+-------+----------+----------+------------+----------+
|Unique_name|Input_time| class| time_up| time_down|repeat_count|updatetime|
+-----------+----------+-------+----------+----------+------------+----------+
| Unique-01|2018-09-01|cleared|2018-09-01|2018-09-15| 0| null|
| Unique-01|2018-09-16|cleared|2018-09-16|2018-09-27| 0| null|
| Unique-02|2018-09-21|cleared|2018-09-21|2018-09-28| 0| null|
| Unique-02|2018-09-28|cleared|2018-09-28|2018-10-15| 1|2018-10-05|
| Unique-02|2018-10-15| new|2018-10-15| null| 0| null|
+-----------+----------+-------+----------+----------+------------+----------+