在 pyspark 数据框中循环遍历两列时向新列添加值
Adding values to a new column while looping through two columns in a pyspark dataframe
我有一个带有列的 pyspark 数据框(除了一些列):
每个月都有多个 ID。每个 id 的活动状态由 amount 列确定。如果金额 > 0,则活动 = 1,否则为 0。
+-----------------------------+---
|id|amount| dates | active |
+-----------------------------+---
| X| 0|2019-05-01| 0 |
| X| 120|2019-06-01| 1 |
| Y| 60|2019-06-01| 1 |
| X| 0|2019-07-01| 0 |
| Y| 0|2019-07-01| 0 |
| Z| 50|2019-06-01| 1 |
| Y| 0|2019-07-01| 0 |
+-----------------------------+---
我要计算和添加的新列是p3mactive。
它是根据过去三个月的活跃状态计算的。
例如:对于 id = x,date = 2019-08-01,p3mactive = 1,因为 X 在 2019-06-01 处于活动状态。
如果之前的月份不存在,则 p3m active = 0。如果只有 1 或 2 个月,则 p3m active 可以简单地计算为 max(active(month-1), active(month-2))。基本上在现有专栏的基础上。
+-----------------------------+-----------+
|id|amount| dates | active | p3mactive |
+-----------------------------+-----------+
| X| 0|2019-05-01| 0 | 0 |
| X| 120|2019-06-01| 1 | 0 |
| Y| 60|2019-06-01| 1 | 0 |
| X| 0|2019-07-01| 0 | 1 |
| Y| 0|2019-07-01| 0 | 1 |
| Z| 50|2019-06-01| 1 | 0 |
| Y| 0|2019-07-01| 0 | 1 |
+-----------------------------+-----------+
所以基本上:
- X for 05 有 active 0,之前没有月,因此 p3mactive 是 0。
- Y 在 06 中激活,因此在 07 中 p3mactive = 1,而在 06 中 p3mactive 仍为 0。
- Z只有06的数据所以06中的p3mactive为0
等等。如果对流程有任何疑问,请告诉我。
我想在 pyspark 中使用更好的数据帧操作和函数来实现它。
一般来说,我可以很容易地想到如何用 pandas 或 python 来做到这一点,但我是新手,想不出一种方法来遍历 ids,每个给定的月份然后 select 前三个月的活动状态进入 max(m1,m2,m3) 函数,如果前几个月不存在,则保持边缘条件。任何帮助将不胜感激。
您可以使用 when
and lag
using a Window
函数来执行此操作:
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, lag
w = Window().partitionBy("id").orderBy("dates")
df = df.withColumn("p3mactive", when(
(lag(df.active,1).over(w) == 1)|
(lag(df.active,2).over(w) == 1) |
(lag(df.active,3).over(w) == 1), 1).otherwise(0))
您不能遍历 pyspark 数据帧,但可以使用 Window
跨越它们。您可以使用 when
应用条件,您可以使用 lag
查看之前的行,使用 lead
查看未来的行。如果 x
之前的行不存在,则条件评估为 false,您将得到 0
作为您的用例提到的。
希望对您有所帮助。
我有一个带有列的 pyspark 数据框(除了一些列): 每个月都有多个 ID。每个 id 的活动状态由 amount 列确定。如果金额 > 0,则活动 = 1,否则为 0。
+-----------------------------+---
|id|amount| dates | active |
+-----------------------------+---
| X| 0|2019-05-01| 0 |
| X| 120|2019-06-01| 1 |
| Y| 60|2019-06-01| 1 |
| X| 0|2019-07-01| 0 |
| Y| 0|2019-07-01| 0 |
| Z| 50|2019-06-01| 1 |
| Y| 0|2019-07-01| 0 |
+-----------------------------+---
我要计算和添加的新列是p3mactive。 它是根据过去三个月的活跃状态计算的。 例如:对于 id = x,date = 2019-08-01,p3mactive = 1,因为 X 在 2019-06-01 处于活动状态。 如果之前的月份不存在,则 p3m active = 0。如果只有 1 或 2 个月,则 p3m active 可以简单地计算为 max(active(month-1), active(month-2))。基本上在现有专栏的基础上。
+-----------------------------+-----------+
|id|amount| dates | active | p3mactive |
+-----------------------------+-----------+
| X| 0|2019-05-01| 0 | 0 |
| X| 120|2019-06-01| 1 | 0 |
| Y| 60|2019-06-01| 1 | 0 |
| X| 0|2019-07-01| 0 | 1 |
| Y| 0|2019-07-01| 0 | 1 |
| Z| 50|2019-06-01| 1 | 0 |
| Y| 0|2019-07-01| 0 | 1 |
+-----------------------------+-----------+
所以基本上:
- X for 05 有 active 0,之前没有月,因此 p3mactive 是 0。
- Y 在 06 中激活,因此在 07 中 p3mactive = 1,而在 06 中 p3mactive 仍为 0。
- Z只有06的数据所以06中的p3mactive为0
等等。如果对流程有任何疑问,请告诉我。
我想在 pyspark 中使用更好的数据帧操作和函数来实现它。 一般来说,我可以很容易地想到如何用 pandas 或 python 来做到这一点,但我是新手,想不出一种方法来遍历 ids,每个给定的月份然后 select 前三个月的活动状态进入 max(m1,m2,m3) 函数,如果前几个月不存在,则保持边缘条件。任何帮助将不胜感激。
您可以使用 when
and lag
using a Window
函数来执行此操作:
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, lag
w = Window().partitionBy("id").orderBy("dates")
df = df.withColumn("p3mactive", when(
(lag(df.active,1).over(w) == 1)|
(lag(df.active,2).over(w) == 1) |
(lag(df.active,3).over(w) == 1), 1).otherwise(0))
您不能遍历 pyspark 数据帧,但可以使用 Window
跨越它们。您可以使用 when
应用条件,您可以使用 lag
查看之前的行,使用 lead
查看未来的行。如果 x
之前的行不存在,则条件评估为 false,您将得到 0
作为您的用例提到的。
希望对您有所帮助。