PySpark 填充一些特定的缺失值
PySpark Fillling Some Specific Missing Values
我的 spark 数据框是;
Client Date Due_Day
A 2017-01-01 Null
A 2017-02-01 Null
A 2017-03-01 Null
A 2017-04-01 Null
A 2017-05-01 Null
A 2017-06-01 35
A 2017-07-01 Null
A 2017-08-01 Null
A 2017-09-01 Null
A 2017-10-01 Null
A 2017-11-01 Null
A 2017-12-01 Null
B 2017-01-01 Null
B 2017-02-01 Null
B 2017-03-01 Null
B 2017-04-01 Null
B 2017-05-01 Null
B 2017-06-01 Null
B 2017-07-01 Null
B 2017-08-01 Null
B 2017-09-01 Null
B 2017-10-01 78
B 2017-11-01 Null
B 2017-12-01 Null
数据帧中同一个客户端有一个非 NULL Due_Day。
期望的输出是;
Client Date Due_Day Result
A 2017-01-01 Null Null
A 2017-02-01 Null Null
A 2017-03-01 Null Null
A 2017-04-01 Null Null
A 2017-05-01 Null Null -> This month should be remain as Null
A 2017-06-01 35 35
A 2017-07-01 Null Paid -> After one month should be 'Paid'
A 2017-08-01 Null OK -> Next Months should be 'OK'
A 2017-09-01 Null OK
A 2017-10-01 Null OK
A 2017-11-01 Null OK
A 2017-12-01 Null OK
B 2017-01-01 Null Null
B 2017-02-01 Null Null
B 2017-03-01 Null Null
B 2017-04-01 Null Null
B 2017-05-01 Null Null
B 2017-06-01 Null Null
B 2017-07-01 Null Null
B 2017-08-01 Null Null
B 2017-09-01 Null Null
B 2017-10-01 78 78
B 2017-11-01 Null Paid -> After one month
B 2017-12-01 Null OK
对于客户端,非 NULL Due_Day 之后的月份应标记为 'Paid'。并且应该在接下来的几个月中标记为 'OK',直到年底。前几个月应该再次保持为 Null。
你能帮我 pyspark 代码吗?
以下可能是适合您的可行解决方案,我将尝试解释解决方案背后的逻辑 -
- 你在
due_day
栏中有一些价值,我们正在按顺序做forward fill
用相同的值填充下一行以供将来计算。
- 一旦配置了
forward_fill
列,接下来就变得简单了,我们可以编写底层逻辑了。
在此处创建 DF
df = spark.createDataFrame([("A","2017-01-01", None),("A","2017-02-01", None),("A","2017-03-01", 35),("A","2017-04-01",None),("A","2017-05-01", None),("B","2017-01-01", None),("B","2017-02-01", 78),("B","2017-03-01", None),("B","2017-04-01", None),("B","2017-05-01", None)],["col1", "col2", "col3"])
df.show(truncate=False)
+----+----------+----+
|col1|col2 |col3|
+----+----------+----+
|A |2017-01-01|null|
|A |2017-02-01|null|
|A |2017-03-01|35 |
|A |2017-04-01|null|
|A |2017-05-01|null|
|B |2017-01-01|null|
|B |2017-02-01|78 |
|B |2017-03-01|null|
|B |2017-04-01|null|
|B |2017-05-01|null|
+----+----------+----+
Forward Fill Here 以便用相同的值填充下一行
w = W.partitionBy("col1").orderBy("col2")
df = df.withColumn("filled_col", F.last("col3", ignorenulls=True).over(w))
df.show()
+----+----------+----+----------+
|col1| col2|col3|filled_col|
+----+----------+----+----------+
| B|2017-01-01|null| null|
| B|2017-02-01| 78| 78|
| B|2017-03-01|null| 78|
| B|2017-04-01|null| 78|
| B|2017-05-01|null| 78|
| A|2017-01-01|null| null|
| A|2017-02-01|null| null|
| A|2017-03-01| 35| 35|
| A|2017-04-01|null| 35|
| A|2017-05-01|null| 35|
+----+----------+----+----------+
我们将在每一行中分配一个 row_number
因为我们已经知道行号为 2 的地方将被支付并保持正常
w2 = W.partitionBy("col1", "filled_col").orderBy("col2")
df = df.withColumn("rnk", F.row_number().over(w2))
df = df.withColumn("Result", F.when(((F.col("filled_col").isNotNull()) & (F.col("rnk") ==F.lit("2"))), F.lit("Paid")).when(((F.col("filled_col").isNotNull()) & (F.col("rnk") > 2)), F.lit("OK")))
df.show()
+----+----------+----+----------+---+------+
|col1| col2|col3|filled_col|rnk|Result|
+----+----------+----+----------+---+------+
| B|2017-01-01|null| null| 1| null|
| B|2017-02-01| 78| 78| 1| null|
| B|2017-03-01|null| 78| 2| Paid|
| B|2017-04-01|null| 78| 3| OK|
| B|2017-05-01|null| 78| 4| OK|
| A|2017-01-01|null| null| 1| null|
| A|2017-02-01|null| null| 2| null|
| A|2017-03-01| 35| 35| 1| null|
| A|2017-04-01|null| 35| 2| Paid|
| A|2017-05-01|null| 35| 3| OK|
+----+----------+----+----------+---+------+
Select 您选择的最后一列
df = df.withColumn("Result", F.coalesce("Result" , "col3"))
df.select("col1", "col2", "col3", "Result").orderBy("col1").show()
+----+----------+----+------+
|col1| col2|col3|Result|
+----+----------+----+------+
| A|2017-01-01|null| null|
| A|2017-02-01|null| null|
| A|2017-03-01| 35| 35|
| A|2017-04-01|null| Paid|
| A|2017-05-01|null| OK|
| B|2017-01-01|null| null|
| B|2017-02-01| 78| 78|
| B|2017-03-01|null| Paid|
| B|2017-04-01|null| OK|
| B|2017-05-01|null| OK|
+----+----------+----+------+
我的 spark 数据框是;
Client Date Due_Day
A 2017-01-01 Null
A 2017-02-01 Null
A 2017-03-01 Null
A 2017-04-01 Null
A 2017-05-01 Null
A 2017-06-01 35
A 2017-07-01 Null
A 2017-08-01 Null
A 2017-09-01 Null
A 2017-10-01 Null
A 2017-11-01 Null
A 2017-12-01 Null
B 2017-01-01 Null
B 2017-02-01 Null
B 2017-03-01 Null
B 2017-04-01 Null
B 2017-05-01 Null
B 2017-06-01 Null
B 2017-07-01 Null
B 2017-08-01 Null
B 2017-09-01 Null
B 2017-10-01 78
B 2017-11-01 Null
B 2017-12-01 Null
数据帧中同一个客户端有一个非 NULL Due_Day。
期望的输出是;
Client Date Due_Day Result
A 2017-01-01 Null Null
A 2017-02-01 Null Null
A 2017-03-01 Null Null
A 2017-04-01 Null Null
A 2017-05-01 Null Null -> This month should be remain as Null
A 2017-06-01 35 35
A 2017-07-01 Null Paid -> After one month should be 'Paid'
A 2017-08-01 Null OK -> Next Months should be 'OK'
A 2017-09-01 Null OK
A 2017-10-01 Null OK
A 2017-11-01 Null OK
A 2017-12-01 Null OK
B 2017-01-01 Null Null
B 2017-02-01 Null Null
B 2017-03-01 Null Null
B 2017-04-01 Null Null
B 2017-05-01 Null Null
B 2017-06-01 Null Null
B 2017-07-01 Null Null
B 2017-08-01 Null Null
B 2017-09-01 Null Null
B 2017-10-01 78 78
B 2017-11-01 Null Paid -> After one month
B 2017-12-01 Null OK
对于客户端,非 NULL Due_Day 之后的月份应标记为 'Paid'。并且应该在接下来的几个月中标记为 'OK',直到年底。前几个月应该再次保持为 Null。
你能帮我 pyspark 代码吗?
以下可能是适合您的可行解决方案,我将尝试解释解决方案背后的逻辑 -
- 你在
due_day
栏中有一些价值,我们正在按顺序做forward fill
用相同的值填充下一行以供将来计算。 - 一旦配置了
forward_fill
列,接下来就变得简单了,我们可以编写底层逻辑了。
在此处创建 DF
df = spark.createDataFrame([("A","2017-01-01", None),("A","2017-02-01", None),("A","2017-03-01", 35),("A","2017-04-01",None),("A","2017-05-01", None),("B","2017-01-01", None),("B","2017-02-01", 78),("B","2017-03-01", None),("B","2017-04-01", None),("B","2017-05-01", None)],["col1", "col2", "col3"])
df.show(truncate=False)
+----+----------+----+
|col1|col2 |col3|
+----+----------+----+
|A |2017-01-01|null|
|A |2017-02-01|null|
|A |2017-03-01|35 |
|A |2017-04-01|null|
|A |2017-05-01|null|
|B |2017-01-01|null|
|B |2017-02-01|78 |
|B |2017-03-01|null|
|B |2017-04-01|null|
|B |2017-05-01|null|
+----+----------+----+
Forward Fill Here 以便用相同的值填充下一行
w = W.partitionBy("col1").orderBy("col2")
df = df.withColumn("filled_col", F.last("col3", ignorenulls=True).over(w))
df.show()
+----+----------+----+----------+
|col1| col2|col3|filled_col|
+----+----------+----+----------+
| B|2017-01-01|null| null|
| B|2017-02-01| 78| 78|
| B|2017-03-01|null| 78|
| B|2017-04-01|null| 78|
| B|2017-05-01|null| 78|
| A|2017-01-01|null| null|
| A|2017-02-01|null| null|
| A|2017-03-01| 35| 35|
| A|2017-04-01|null| 35|
| A|2017-05-01|null| 35|
+----+----------+----+----------+
我们将在每一行中分配一个 row_number
因为我们已经知道行号为 2 的地方将被支付并保持正常
w2 = W.partitionBy("col1", "filled_col").orderBy("col2")
df = df.withColumn("rnk", F.row_number().over(w2))
df = df.withColumn("Result", F.when(((F.col("filled_col").isNotNull()) & (F.col("rnk") ==F.lit("2"))), F.lit("Paid")).when(((F.col("filled_col").isNotNull()) & (F.col("rnk") > 2)), F.lit("OK")))
df.show()
+----+----------+----+----------+---+------+
|col1| col2|col3|filled_col|rnk|Result|
+----+----------+----+----------+---+------+
| B|2017-01-01|null| null| 1| null|
| B|2017-02-01| 78| 78| 1| null|
| B|2017-03-01|null| 78| 2| Paid|
| B|2017-04-01|null| 78| 3| OK|
| B|2017-05-01|null| 78| 4| OK|
| A|2017-01-01|null| null| 1| null|
| A|2017-02-01|null| null| 2| null|
| A|2017-03-01| 35| 35| 1| null|
| A|2017-04-01|null| 35| 2| Paid|
| A|2017-05-01|null| 35| 3| OK|
+----+----------+----+----------+---+------+
Select 您选择的最后一列
df = df.withColumn("Result", F.coalesce("Result" , "col3"))
df.select("col1", "col2", "col3", "Result").orderBy("col1").show()
+----+----------+----+------+
|col1| col2|col3|Result|
+----+----------+----+------+
| A|2017-01-01|null| null|
| A|2017-02-01|null| null|
| A|2017-03-01| 35| 35|
| A|2017-04-01|null| Paid|
| A|2017-05-01|null| OK|
| B|2017-01-01|null| null|
| B|2017-02-01| 78| 78|
| B|2017-03-01|null| Paid|
| B|2017-04-01|null| OK|
| B|2017-05-01|null| OK|
+----+----------+----+------+