Pyspark-比较同一组内的行并根据比较制定新列

Pyspark- compare rows within the same group and formulate new columns based on the comparision

我遇到了一个棘手的情况,我正在尝试使用 pyspark 来解决同样的问题

输入数据帧中有两个唯一的 ID,class 要么是新的,要么是清晰的。 数据帧根据 Input_time.

升序排序

下面是创建结果数据框或 table 的逻辑。

对于一组唯一 ID

如果时间 1 的新行在时间 2 (time2 > time1) 已清除,则应将 uniqueid 设置为已清除,time_up 是新行的行,time_down 是'cleared' 行,和 class 'cleared' .

如果没有清除,并且有连续的新 class 相同 id ,则采用第 1 次出现,并增加新条目的 repeatCount。

如果任何已清除的都没有新的配对,则该行将被放弃。

输入:

+-----------+-------------+-------+----------+----------+-----------+----------+
|Unique_name|   Input_time|  class|   time_up| time_down|repeatCount|updatetime|
+-----------+-------------+-------+----------+----------+-----------+----------+
|  Unique-01|   2018-09-01|    new|2018-09-01|      null|          0|         0|
|  Unique-01|   2018-09-15|cleared|      null|2018-09-15|          0|         0|
|  Unique-01|   2018-09-16|    new|2018-09-16|      null|          0|         0|
|  Unique-01|   2018-09-27|cleared|      null|2018-09-27|          0|         0|
|  Unique-01|   2018-09-30|cleared|      null|2018-09-30|          0|         0|
|  Unique-02|   2018-09-21|    new|2018-09-21|      null|          0|         0|
|  Unique-02|   2018-09-28|cleared|      null|2018-09-28|          0|         0|
|  Unique-02|   2018-09-28|    new|2018-09-28|      null|          0|         0|
|  Unique-02|   2018-10-05|    new|2018-10-05|      null|          0|         0|
|  Unique-02|   2018-10-15|cleared|      null|2018-10-15|          0|         0|
|  Unique-02|   2018-10-15|    new|2018-10-15|      null|          0|         0|
+-----------+-------------+-------+----------+----------+-----------+----------+

结果:

+-----------+-------------+-------+----------+----------+-----------+----------+
|Unique_name|   Input_time|  class|   time_up| time_down|repeatCount|updatetime|
+-----------+-------------+-------+----------+----------+-----------+----------+
|  Unique-01|   2018-09-01|cleared|2018-09-01|2018-09-15|          0|         0|
|  Unique-01|   2018-09-16|cleared|2018-09-16|2018-09-27|          0|         0|
|  Unique-02|   2018-09-21|cleared|2018-09-21|2018-09-28|          0|         0|
|  Unique-02|   2018-10-28|cleared|2018-09-28|2018-10-15|          1|2018-10-05|
|  Unique-02|   2018-10-15|    new|2018-10-15|      null|          0|         0|
+-----------+-------------+-------+----------+----------+-----------+----------+

在这个例子中, 独特-01 第一行是新的(2018-09-01),(下一行稍后出现)被清除,所以这形成了一对,第一行应该更新为第二行的 class 和 time_down

然后在 2018-09-16 又是一个新的,在 2018-09-27 被清除 下一行是 2018-09-30 但没有 'new' 是 cleared/paired。所以这个就放弃了。

唯一-02 第一行是新的(2018-09-21),(下一行稍后出现)被清除,所以这形成一对并且第一行应该用第二行的 class 和 time_down(2018-09-28)

下一行(Input_time at 2018-09-28)是新的,下一行没有明确但下一行又是新的(2018-10-05),所以 repeatCount 现在增加到 1 并且 updatetime 是重复行的时间..

下一行已清除,因此重复 1 的新行现在已清除。

终于在2018-10-15有了unique-02,没有新的row/clear,现在是最新的

任何人都可以提出有关如何使用 pyspark 实现此目的的想法。

我确实尝试将数据帧转换为字典列表并进行迭代,但如果有 1000 行,这是一种非常耗时且效率低下的方法。

获得预期输出的一种方法是:

  1. 识别并合并连续的 new class 行并维护 repeatCountupdatetime
  2. 仅保留前 cleared class 行连续 cleared
  3. 对于第 2 步中的数据框,找到下一行的 class,如果 cleared 更新相关列
from pyspark.sql import functions as F
from pyspark.sql.functions import col as c, lit as l
from pyspark.sql import Window as W

data = [("Unique-01", "2018-09-01", "new", "2018-09-01", None,),
        ("Unique-01", "2018-09-15", "cleared", None, "2018-09-15",),
        ("Unique-01", "2018-09-16", "new", "2018-09-16", None,),
        ("Unique-01", "2018-09-27", "cleared", None, "2018-09-27",),
        ("Unique-01", "2018-09-30", "cleared", None, "2018-09-30",),
        ("Unique-02", "2018-09-21", "new", "2018-09-21", None,),
        ("Unique-02", "2018-09-28", "cleared", None, "2018-09-28",),
        ("Unique-02", "2018-09-28", "new", "2018-09-28", None,),
        ("Unique-02", "2018-10-05", "new", "2018-10-05", None,),
        ("Unique-02", "2018-10-15", "cleared", None, "2018-10-15",),
        ("Unique-02", "2018-10-15", "new", "2018-10-15", None,), ]

df = spark.createDataFrame(data, ("Unique_name", "Input_time", "class", "time_up", "time_down",))

time_columns = ["Input_time", "time_up", "time_down", ]

df = df.select(*[F.to_date(c).alias(c) if c in time_columns else c for c in df.columns])

# STEP 1 and 2
ws = W.partitionBy("Unique_name").orderBy("Input_time")
runs_df = (df.withColumn("prev_class", F.lag("class", 1).over(ws))
   .where(~((c("class") == c("prev_class")) & (c("class") == l("cleared"))))
   .withColumn("run_indicator", F.when(c("class") == c("prev_class"), l(0)).otherwise(l(1)))
   .withColumn("runs", F.sum("run_indicator").over(ws.rowsBetween(W.unboundedPreceding, W.currentRow)))
   .drop("run_indicator")
)

new_class_run_ws = W.partitionBy("Unique_name", "runs").orderBy("Input_time").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

merged_new_runs = (runs_df.withColumn("run_start_input_time", F.first("Input_time").over(new_class_run_ws))
        .withColumn("run_end_input_time", F.last("Input_time").over(new_class_run_ws))
        .withColumn("repeat_count", F.count("Input_time").over(new_class_run_ws) - l(1))
        .where("run_start_input_time == Input_time")
        .drop("prev_class", "runs", "run_start_input_time", )
)

# STEP 3

(merged_new_runs.withColumn("next_class", F.lead("class", 1).over(ws))
                .withColumn("next_time_down", F.lead("time_down", 1).over(ws))
                .where("class == 'new'")
                .withColumn("class", F.coalesce("next_class", "class"))
                .drop("time_down")
                .withColumnRenamed("next_time_down", "time_down")
                .withColumn("updatetime", F.when(c("repeat_count") > 0, c("run_end_input_time")).otherwise(l(None)))
                .select("Unique_name", "Input_time", "class", "time_up", "time_down", "repeat_count", "updatetime", )
).show()

输出

+-----------+----------+-------+----------+----------+------------+----------+
|Unique_name|Input_time|  class|   time_up| time_down|repeat_count|updatetime|
+-----------+----------+-------+----------+----------+------------+----------+
|  Unique-01|2018-09-01|cleared|2018-09-01|2018-09-15|           0|      null|
|  Unique-01|2018-09-16|cleared|2018-09-16|2018-09-27|           0|      null|
|  Unique-02|2018-09-21|cleared|2018-09-21|2018-09-28|           0|      null|
|  Unique-02|2018-09-28|cleared|2018-09-28|2018-10-15|           1|2018-10-05|
|  Unique-02|2018-10-15|    new|2018-10-15|      null|           0|      null|
+-----------+----------+-------+----------+----------+------------+----------+