PySpark 填充一些特定的缺失值

PySpark Fillling Some Specific Missing Values

我的 spark 数据框是;

Client  Date        Due_Day
A      2017-01-01   Null
A      2017-02-01   Null
A      2017-03-01   Null
A      2017-04-01   Null
A      2017-05-01   Null
A      2017-06-01   35
A      2017-07-01   Null
A      2017-08-01   Null
A      2017-09-01   Null
A      2017-10-01   Null
A      2017-11-01   Null
A      2017-12-01   Null
B      2017-01-01   Null
B      2017-02-01   Null
B      2017-03-01   Null
B      2017-04-01   Null
B      2017-05-01   Null
B      2017-06-01   Null
B      2017-07-01   Null
B      2017-08-01   Null
B      2017-09-01   Null
B      2017-10-01   78
B      2017-11-01   Null
B      2017-12-01   Null

数据帧中同一个客户端有一个非 NULL Due_Day。

期望的输出是;

Client  Date    Due_Day    Result
A   2017-01-01  Null       Null
A   2017-02-01  Null       Null
A   2017-03-01  Null       Null
A   2017-04-01  Null       Null
A   2017-05-01  Null       Null         -> This month should be remain as Null
A   2017-06-01  35         35
A   2017-07-01  Null       Paid         -> After one month should be 'Paid'
A   2017-08-01  Null       OK           -> Next Months should be 'OK'
A   2017-09-01  Null       OK
A   2017-10-01  Null       OK
A   2017-11-01  Null       OK
A   2017-12-01  Null       OK
B   2017-01-01  Null       Null
B   2017-02-01  Null       Null
B   2017-03-01  Null       Null
B   2017-04-01  Null       Null
B   2017-05-01  Null       Null
B   2017-06-01  Null       Null
B   2017-07-01  Null       Null
B   2017-08-01  Null       Null
B   2017-09-01  Null       Null
B   2017-10-01  78         78
B   2017-11-01  Null       Paid         -> After one month
B   2017-12-01  Null       OK

对于客户端,非 NULL Due_Day 之后的月份应标记为 'Paid'。并且应该在接下来的几个月中标记为 'OK',直到年底。前几个月应该再次保持为 Null。

你能帮我 pyspark 代码吗?

以下可能是适合您的可行解决方案,我将尝试解释解决方案背后的逻辑 -

  1. 你在due_day栏中有一些价值,我们正在按顺序做forward fill用相同的值填充下一行以供将来计算。
  2. 一旦配置了forward_fill列,接下来就变得简单了,我们可以编写底层逻辑了。

在此处创建 DF

df = spark.createDataFrame([("A","2017-01-01", None),("A","2017-02-01", None),("A","2017-03-01", 35),("A","2017-04-01",None),("A","2017-05-01", None),("B","2017-01-01", None),("B","2017-02-01", 78),("B","2017-03-01", None),("B","2017-04-01", None),("B","2017-05-01", None)],["col1", "col2", "col3"])
df.show(truncate=False) 
+----+----------+----+
|col1|col2      |col3|
+----+----------+----+
|A   |2017-01-01|null|
|A   |2017-02-01|null|
|A   |2017-03-01|35  |
|A   |2017-04-01|null|
|A   |2017-05-01|null|
|B   |2017-01-01|null|
|B   |2017-02-01|78  |
|B   |2017-03-01|null|
|B   |2017-04-01|null|
|B   |2017-05-01|null|
+----+----------+----+

Forward Fill Here 以便用相同的值填充下一行

w = W.partitionBy("col1").orderBy("col2")
df = df.withColumn("filled_col", F.last("col3", ignorenulls=True).over(w))
df.show()
+----+----------+----+----------+
|col1|      col2|col3|filled_col|
+----+----------+----+----------+
|   B|2017-01-01|null|      null|
|   B|2017-02-01|  78|        78|
|   B|2017-03-01|null|        78|
|   B|2017-04-01|null|        78|
|   B|2017-05-01|null|        78|
|   A|2017-01-01|null|      null|
|   A|2017-02-01|null|      null|
|   A|2017-03-01|  35|        35|
|   A|2017-04-01|null|        35|
|   A|2017-05-01|null|        35|
+----+----------+----+----------+

我们将在每一行中分配一个 row_number 因为我们已经知道行号为 2 的地方将被支付并保持正常

    w2 = W.partitionBy("col1", "filled_col").orderBy("col2")
df = df.withColumn("rnk", F.row_number().over(w2))
df = df.withColumn("Result", F.when(((F.col("filled_col").isNotNull()) & (F.col("rnk") ==F.lit("2"))), F.lit("Paid")).when(((F.col("filled_col").isNotNull()) & (F.col("rnk") > 2)), F.lit("OK")))
df.show()
+----+----------+----+----------+---+------+
|col1|      col2|col3|filled_col|rnk|Result|
+----+----------+----+----------+---+------+
|   B|2017-01-01|null|      null|  1|  null|
|   B|2017-02-01|  78|        78|  1|  null|
|   B|2017-03-01|null|        78|  2|  Paid|
|   B|2017-04-01|null|        78|  3|    OK|
|   B|2017-05-01|null|        78|  4|    OK|
|   A|2017-01-01|null|      null|  1|  null|
|   A|2017-02-01|null|      null|  2|  null|
|   A|2017-03-01|  35|        35|  1|  null|
|   A|2017-04-01|null|        35|  2|  Paid|
|   A|2017-05-01|null|        35|  3|    OK|
+----+----------+----+----------+---+------+

Select 您选择的最后一列

df = df.withColumn("Result", F.coalesce("Result" , "col3"))
df.select("col1", "col2", "col3", "Result").orderBy("col1").show()
+----+----------+----+------+
|col1|      col2|col3|Result|
+----+----------+----+------+
|   A|2017-01-01|null|  null|
|   A|2017-02-01|null|  null|
|   A|2017-03-01|  35|    35|
|   A|2017-04-01|null|  Paid|
|   A|2017-05-01|null|    OK|
|   B|2017-01-01|null|  null|
|   B|2017-02-01|  78|    78|
|   B|2017-03-01|null|  Paid|
|   B|2017-04-01|null|    OK|
|   B|2017-05-01|null|    OK|
+----+----------+----+------+