Pyspark:Drop/Filter 行基于列和排名的总和
Pyspark: Drop/Filter rows based on Summing of columns and Rank
我有一个这样的数据框:
df = pd.DataFrame({"Date": ["2020-05-10", "2020-05-10", "2020-05-10", "2020-05-11", "2020-05-11", "2020-05-11", "2020-05-11", "2020-05-11", "2020-05-11"],
"Slot_Length": [30, 30, 30, 30, 30, 30, 30, 30, 30],
"Total_Space": [60, 60, 60, 120, 120, 120, 120, 120, 120],
"Amount_Over": [-30, -30, -30, -60, -60, -60, -60, -60, -60],
"Rank": [1, 1, 2, 1, 1, 1, 1, 2, 2]})
df = spark.createDataFrame(df)
+----------+-----------+-----------+-----------+----+
| Date|Slot_Length|Total_Space|Amount_Over|Rank|
+----------+-----------+-----------+-----------+----+
|2020-05-10| 30| 60| -30| 1|
|2020-05-10| 30| 60| -30| 1|
|2020-05-10| 30| 60| -30| 2|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 2|
|2020-05-11| 30| 120| -60| 2|
+----------+-----------+-----------+-----------+----+
对于每个 Date
我有一个 Total_Space
可以填充。所以对于 2020-05-10
,我有 60 秒,对于 2020-05-11
,我有 120 秒。
每个 Date
也已经分配了具有特定 Slot_Length
的插槽。
对于每个 Date
,我已经计算出 Date
在 Amount_Over
列中结束的 space 的数量,并根据优先级列对它们进行了适当的排名此处未显示。
我想做的是删除具有最低 Rank
的行以获得 Date
,直到 Slot_Length
加起来等于 Total_Space
Date
。
+----------+-----------+-----------+-----------+----+
| Date|Slot_Length|Total_Space|Amount_Over|Rank|
+----------+-----------+-----------+-----------+----+
|2020-05-10| 30| 60| -30| 1|
|2020-05-10| 30| 60| -30| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
+----------+-----------+-----------+-----------+----+
在这个例子中,把所有 Rank
都等于 2 一样简单,但是会有排名之间有平局的例子,所以先取最高的排名,然后随机取一个如果有平局,一个。
最好的方法是什么?我已经明白它需要一个 Window 日期函数才能正确地对 Slot_Length
、Total_Space
和 Amount_Over
列进行每个计算。
df = pd.DataFrame({"Date": ["2020-05-10", "2020-05-10", "2020-05-10", "2020-05-11", "2020-05-11", "2020-05-11",
"2020-05-11", "2020-05-11", "2020-05-11"],
"Slot_Length": [30, 30, 30, 30, 30, 30, 30, 30, 30],
"Total_Space": [60, 60, 60, 120, 120, 120, 120, 120, 120],
"Amount_Over": [-30, -30, -30, -60, -60, -60, -60, -60, -60],
"Rank": [1, 1, 2, 1, 1, 1, 1, 2, 2]})
df = spark.createDataFrame(df)
w = Window.partitionBy("Date").orderBy("Rank").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn(
"Cumulative_Sum", F.sum("Slot_Length").over(w)
).filter(
F.col("Cumulative_Sum") <= F.col("Total_Space")
).orderBy("Date","Rank","Cumulative_Sum").show()
结果
+----------+-----------+-----------+-----------+----+--------------+
| Date|Slot_Length|Total_Space|Amount_Over|Rank|Cumulative_Sum|
+----------+-----------+-----------+-----------+----+--------------+
|2020-05-10| 30| 60| -30| 1| 30|
|2020-05-10| 30| 60| -30| 1| 60|
|2020-05-11| 30| 120| -60| 1| 30|
|2020-05-11| 30| 120| -60| 1| 60|
|2020-05-11| 30| 120| -60| 1| 90|
|2020-05-11| 30| 120| -60| 1| 120|
+----------+-----------+-----------+-----------+----+--------------+
我有一个这样的数据框:
df = pd.DataFrame({"Date": ["2020-05-10", "2020-05-10", "2020-05-10", "2020-05-11", "2020-05-11", "2020-05-11", "2020-05-11", "2020-05-11", "2020-05-11"],
"Slot_Length": [30, 30, 30, 30, 30, 30, 30, 30, 30],
"Total_Space": [60, 60, 60, 120, 120, 120, 120, 120, 120],
"Amount_Over": [-30, -30, -30, -60, -60, -60, -60, -60, -60],
"Rank": [1, 1, 2, 1, 1, 1, 1, 2, 2]})
df = spark.createDataFrame(df)
+----------+-----------+-----------+-----------+----+
| Date|Slot_Length|Total_Space|Amount_Over|Rank|
+----------+-----------+-----------+-----------+----+
|2020-05-10| 30| 60| -30| 1|
|2020-05-10| 30| 60| -30| 1|
|2020-05-10| 30| 60| -30| 2|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 2|
|2020-05-11| 30| 120| -60| 2|
+----------+-----------+-----------+-----------+----+
对于每个 Date
我有一个 Total_Space
可以填充。所以对于 2020-05-10
,我有 60 秒,对于 2020-05-11
,我有 120 秒。
每个 Date
也已经分配了具有特定 Slot_Length
的插槽。
对于每个 Date
,我已经计算出 Date
在 Amount_Over
列中结束的 space 的数量,并根据优先级列对它们进行了适当的排名此处未显示。
我想做的是删除具有最低 Rank
的行以获得 Date
,直到 Slot_Length
加起来等于 Total_Space
Date
。
+----------+-----------+-----------+-----------+----+
| Date|Slot_Length|Total_Space|Amount_Over|Rank|
+----------+-----------+-----------+-----------+----+
|2020-05-10| 30| 60| -30| 1|
|2020-05-10| 30| 60| -30| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
+----------+-----------+-----------+-----------+----+
在这个例子中,把所有 Rank
都等于 2 一样简单,但是会有排名之间有平局的例子,所以先取最高的排名,然后随机取一个如果有平局,一个。
最好的方法是什么?我已经明白它需要一个 Window 日期函数才能正确地对 Slot_Length
、Total_Space
和 Amount_Over
列进行每个计算。
df = pd.DataFrame({"Date": ["2020-05-10", "2020-05-10", "2020-05-10", "2020-05-11", "2020-05-11", "2020-05-11",
"2020-05-11", "2020-05-11", "2020-05-11"],
"Slot_Length": [30, 30, 30, 30, 30, 30, 30, 30, 30],
"Total_Space": [60, 60, 60, 120, 120, 120, 120, 120, 120],
"Amount_Over": [-30, -30, -30, -60, -60, -60, -60, -60, -60],
"Rank": [1, 1, 2, 1, 1, 1, 1, 2, 2]})
df = spark.createDataFrame(df)
w = Window.partitionBy("Date").orderBy("Rank").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn(
"Cumulative_Sum", F.sum("Slot_Length").over(w)
).filter(
F.col("Cumulative_Sum") <= F.col("Total_Space")
).orderBy("Date","Rank","Cumulative_Sum").show()
结果
+----------+-----------+-----------+-----------+----+--------------+
| Date|Slot_Length|Total_Space|Amount_Over|Rank|Cumulative_Sum|
+----------+-----------+-----------+-----------+----+--------------+
|2020-05-10| 30| 60| -30| 1| 30|
|2020-05-10| 30| 60| -30| 1| 60|
|2020-05-11| 30| 120| -60| 1| 30|
|2020-05-11| 30| 120| -60| 1| 60|
|2020-05-11| 30| 120| -60| 1| 90|
|2020-05-11| 30| 120| -60| 1| 120|
+----------+-----------+-----------+-----------+----+--------------+