Window 函数与 PySpark
Window function with PySpark
我有一个 PySpark Dataframe,我的目标是创建一个 Flag
列,其值取决于 Amount
列的值。
基本上,对于每个组,我想知道是否在前三个月的任何一个月中,数量大于0,如果是这样,则所有组的 Flag 列的值将为 1,否则该值为 0。
我将包含一个示例来更好地说明。
初始 PySpark 数据帧:
Group
Month
Amount
A
1
0
A
2
0
A
3
35
A
4
0
A
5
0
B
1
0
B
2
0
C
1
0
C
2
0
C
3
0
C
4
13
D
1
0
D
2
24
D
3
0
最终 PySpark 数据框:
Group
Month
Amount
Flag
A
1
0
1
A
2
0
1
A
3
35
1
A
4
0
1
A
5
0
1
B
1
0
0
B
2
0
0
C
1
0
0
C
2
0
0
C
3
0
0
C
4
13
0
D
1
0
1
D
2
24
1
D
3
0
1
基本上,我想要的是每个组,前3个月的总和。如果该总和大于 0,则该组的所有元素的标志为 1,否则为 0。
您可以通过应用 Window
函数来创建 flag
列。创建一个 psuedo-column,如果满足条件,它变为 1,然后最后对 psuedo-column 求和,如果它大于 0,则至少有一次行满足条件并设置 flag
到 1.
from pyspark.sql import functions as F
from pyspark.sql import Window as W
data = [("A", 1, 0, ),
("A", 2, 0, ),
("A", 3, 35, ),
("A", 4, 0, ),
("A", 5, 0, ),
("B", 1, 0, ),
("B", 2, 0, ),
("C", 1, 0, ),
("C", 2, 0, ),
("C", 3, 0, ),
("C", 4, 13, ),
("D", 1, 0, ),
("D", 2, 24, ),
("D", 3, 0, ), ]
df = spark.createDataFrame(data, ("Group", "Month", "Amount", ))
ws = W.partitionBy("Group").orderBy("Month").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
criteria = F.when((F.col("Month") < 4) & (F.col("Amount") > 0), F.lit(1)).otherwise(F.lit(0))
(df.withColumn("flag", F.when(F.sum(criteria).over(ws) > 0, F.lit(1)).otherwise(F.lit(0)))
).show()
"""
+-----+-----+------+----+
|Group|Month|Amount|flag|
+-----+-----+------+----+
| A| 1| 0| 1|
| A| 2| 0| 1|
| A| 3| 35| 1|
| A| 4| 0| 1|
| A| 5| 0| 1|
| B| 1| 0| 0|
| B| 2| 0| 0|
| C| 1| 0| 0|
| C| 2| 0| 0|
| C| 3| 0| 0|
| C| 4| 13| 0|
| D| 1| 0| 1|
| D| 2| 24| 1|
| D| 3| 0| 1|
+-----+-----+------+----+
"""
您可以将 Window 函数与 count
和 when
一起使用。
w = Window.partitionBy('Group')
df = df.withColumn('Flag', F.count(
F.when((F.col('Month') < 4) & (F.col('Amount') > 0), True)).over(w))
.withColumn('Flag', F.when(F.col('Flag') > 0, 1).otherwise(0))
我有一个 PySpark Dataframe,我的目标是创建一个 Flag
列,其值取决于 Amount
列的值。
基本上,对于每个组,我想知道是否在前三个月的任何一个月中,数量大于0,如果是这样,则所有组的 Flag 列的值将为 1,否则该值为 0。
我将包含一个示例来更好地说明。
初始 PySpark 数据帧:
Group | Month | Amount |
---|---|---|
A | 1 | 0 |
A | 2 | 0 |
A | 3 | 35 |
A | 4 | 0 |
A | 5 | 0 |
B | 1 | 0 |
B | 2 | 0 |
C | 1 | 0 |
C | 2 | 0 |
C | 3 | 0 |
C | 4 | 13 |
D | 1 | 0 |
D | 2 | 24 |
D | 3 | 0 |
最终 PySpark 数据框:
Group | Month | Amount | Flag |
---|---|---|---|
A | 1 | 0 | 1 |
A | 2 | 0 | 1 |
A | 3 | 35 | 1 |
A | 4 | 0 | 1 |
A | 5 | 0 | 1 |
B | 1 | 0 | 0 |
B | 2 | 0 | 0 |
C | 1 | 0 | 0 |
C | 2 | 0 | 0 |
C | 3 | 0 | 0 |
C | 4 | 13 | 0 |
D | 1 | 0 | 1 |
D | 2 | 24 | 1 |
D | 3 | 0 | 1 |
基本上,我想要的是每个组,前3个月的总和。如果该总和大于 0,则该组的所有元素的标志为 1,否则为 0。
您可以通过应用 Window
函数来创建 flag
列。创建一个 psuedo-column,如果满足条件,它变为 1,然后最后对 psuedo-column 求和,如果它大于 0,则至少有一次行满足条件并设置 flag
到 1.
from pyspark.sql import functions as F
from pyspark.sql import Window as W
data = [("A", 1, 0, ),
("A", 2, 0, ),
("A", 3, 35, ),
("A", 4, 0, ),
("A", 5, 0, ),
("B", 1, 0, ),
("B", 2, 0, ),
("C", 1, 0, ),
("C", 2, 0, ),
("C", 3, 0, ),
("C", 4, 13, ),
("D", 1, 0, ),
("D", 2, 24, ),
("D", 3, 0, ), ]
df = spark.createDataFrame(data, ("Group", "Month", "Amount", ))
ws = W.partitionBy("Group").orderBy("Month").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
criteria = F.when((F.col("Month") < 4) & (F.col("Amount") > 0), F.lit(1)).otherwise(F.lit(0))
(df.withColumn("flag", F.when(F.sum(criteria).over(ws) > 0, F.lit(1)).otherwise(F.lit(0)))
).show()
"""
+-----+-----+------+----+
|Group|Month|Amount|flag|
+-----+-----+------+----+
| A| 1| 0| 1|
| A| 2| 0| 1|
| A| 3| 35| 1|
| A| 4| 0| 1|
| A| 5| 0| 1|
| B| 1| 0| 0|
| B| 2| 0| 0|
| C| 1| 0| 0|
| C| 2| 0| 0|
| C| 3| 0| 0|
| C| 4| 13| 0|
| D| 1| 0| 1|
| D| 2| 24| 1|
| D| 3| 0| 1|
+-----+-----+------+----+
"""
您可以将 Window 函数与 count
和 when
一起使用。
w = Window.partitionBy('Group')
df = df.withColumn('Flag', F.count(
F.when((F.col('Month') < 4) & (F.col('Amount') > 0), True)).over(w))
.withColumn('Flag', F.when(F.col('Flag') > 0, 1).otherwise(0))