Window 函数与 PySpark

Window function with PySpark

我有一个 PySpark Dataframe,我的目标是创建一个 Flag 列,其值取决于 Amount 列的值。 基本上,对于每个组,我想知道是否在前三个月的任何一个月中,数量大于0,如果是这样,则所有组的 Flag 列的值将为 1,否则该值为 0。

我将包含一个示例来更好地说明。

初始 PySpark 数据帧:

Group Month Amount
A 1 0
A 2 0
A 3 35
A 4 0
A 5 0
B 1 0
B 2 0
C 1 0
C 2 0
C 3 0
C 4 13
D 1 0
D 2 24
D 3 0

最终 PySpark 数据框:

Group Month Amount Flag
A 1 0 1
A 2 0 1
A 3 35 1
A 4 0 1
A 5 0 1
B 1 0 0
B 2 0 0
C 1 0 0
C 2 0 0
C 3 0 0
C 4 13 0
D 1 0 1
D 2 24 1
D 3 0 1

基本上,我想要的是每个组,前3个月的总和。如果该总和大于 0,则该组的所有元素的标志为 1,否则为 0。

您可以通过应用 Window 函数来创建 flag 列。创建一个 psuedo-column,如果满足条件,它变为 1,然后最后对 psuedo-column 求和,如果它大于 0,则至少有一次行满足条件并设置 flag 到 1.

from pyspark.sql import functions as F
from pyspark.sql import Window as W

data = [("A", 1, 0, ), 
("A", 2, 0, ), 
("A", 3, 35, ), 
("A", 4, 0, ), 
("A", 5, 0, ), 
("B", 1, 0, ), 
("B", 2, 0, ), 
("C", 1, 0, ), 
("C", 2, 0, ), 
("C", 3, 0, ), 
("C", 4, 13, ), 
("D", 1, 0, ), 
("D", 2, 24, ), 
("D", 3, 0, ), ]

df = spark.createDataFrame(data, ("Group", "Month", "Amount", ))

ws = W.partitionBy("Group").orderBy("Month").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

criteria = F.when((F.col("Month") < 4) & (F.col("Amount") > 0), F.lit(1)).otherwise(F.lit(0))

(df.withColumn("flag", F.when(F.sum(criteria).over(ws) > 0, F.lit(1)).otherwise(F.lit(0)))
).show()

"""
+-----+-----+------+----+
|Group|Month|Amount|flag|
+-----+-----+------+----+
|    A|    1|     0|   1|
|    A|    2|     0|   1|
|    A|    3|    35|   1|
|    A|    4|     0|   1|
|    A|    5|     0|   1|
|    B|    1|     0|   0|
|    B|    2|     0|   0|
|    C|    1|     0|   0|
|    C|    2|     0|   0|
|    C|    3|     0|   0|
|    C|    4|    13|   0|
|    D|    1|     0|   1|
|    D|    2|    24|   1|
|    D|    3|     0|   1|
+-----+-----+------+----+
"""

您可以将 Window 函数与 countwhen 一起使用。

w = Window.partitionBy('Group')
df = df.withColumn('Flag', F.count(
        F.when((F.col('Month') < 4) & (F.col('Amount') > 0), True)).over(w))
     .withColumn('Flag', F.when(F.col('Flag') > 0, 1).otherwise(0))