在 pyspark 数据框中循环遍历两列时向新列添加值

Adding values to a new column while looping through two columns in a pyspark dataframe

我有一个带有列的 pyspark 数据框(除了一些列): 每个月都有多个 ID。每个 id 的活动状态由 amount 列确定。如果金额 > 0,则活动 = 1,否则为 0。

+-----------------------------+---
|id|amount|  dates   | active |
+-----------------------------+---
| X|     0|2019-05-01|    0   |
| X|   120|2019-06-01|    1   |      
| Y|    60|2019-06-01|    1   |
| X|     0|2019-07-01|    0   |
| Y|     0|2019-07-01|    0   |
| Z|    50|2019-06-01|    1   |
| Y|     0|2019-07-01|    0   |
+-----------------------------+---

我要计算和添加的新列是p3mactive。 它是根据过去三个月的活跃状态计算的。 例如:对于 id = x,date = 2019-08-01,p3mactive = 1,因为 X 在 2019-06-01 处于活动状态。 如果之前的月份不存在,则 p3m active = 0。如果只有 1 或 2 个月,则 p3m active 可以简单地计算为 max(active(month-1), active(month-2))。基本上在现有专栏的基础上。

+-----------------------------+-----------+
|id|amount|  dates   | active | p3mactive |
+-----------------------------+-----------+
| X|     0|2019-05-01|    0   |     0     |
| X|   120|2019-06-01|    1   |     0     |      
| Y|    60|2019-06-01|    1   |     0     |
| X|     0|2019-07-01|    0   |     1     |
| Y|     0|2019-07-01|    0   |     1     |
| Z|    50|2019-06-01|    1   |     0     |
| Y|     0|2019-07-01|    0   |     1     |
+-----------------------------+-----------+

所以基本上:

  1. X for 05 有 active 0,之前没有月,因此 p3mactive 是 0。
  2. Y 在 06 中激活,因此在 07 中 p3mactive = 1,而在 06 中 p3mactive 仍为 0。
  3. Z只有06的数据所以06中的p3mactive为0

等等。如果对流程有任何疑问,请告诉我。

我想在 pyspark 中使用更好的数据帧操作和函数来实现它。 一般来说,我可以很容易地想到如何用 pandas 或 python 来做到这一点,但我是新手,想不出一种方法来遍历 ids,每个给定的月份然后 select 前三个月的活动状态进入 max(m1,m2,m3) 函数,如果前几个月不存在,则保持边缘条件。任何帮助将不胜感激。

您可以使用 when and lag using a Window 函数来执行此操作:

from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, lag

w = Window().partitionBy("id").orderBy("dates")
df = df.withColumn("p3mactive", when(
    (lag(df.active,1).over(w) == 1)| 
    (lag(df.active,2).over(w) == 1) | 
    (lag(df.active,3).over(w) == 1), 1).otherwise(0))

您不能遍历 pyspark 数据帧,但可以使用 Window 跨越它们。您可以使用 when 应用条件,您可以使用 lag 查看之前的行,使用 lead 查看未来的行。如果 x 之前的行不存在,则条件评估为 false,您将得到 0 作为您的用例提到的。

希望对您有所帮助。