根据条件创建一个列并继承以前的值

create a column based on a condition and carrying over previous values

我有以下按 "col1" 排序的数据框。

+----+----+
|col1|col2|
+----+----+
|   a|   x|
|   a|   x|
|   a|   y|
|   b|   x|
|   b|   z|
|   c|   x|
|   c|   y|
|   d|   z|
|   d|   x|
+----+----+

我想添加一个新列说 "col3",对于唯一组中的每一行('a'、'b'、'c' 'd') in "col1" if "col2" value in ('x' or 'y') 将值增加 1 else 如果值为 'z' 或任何其他进位值超过价值。例如,在 "a" 的第一行中,因为 col2 是 x,我们通过添加 0 + 1 = 1 来递增 1,在第二行中,因为 col2 再次是 x,我们递增 1 + 1 = 2 等等。对于 col1 值为 b(第 4 行)的第二组,我们开始新的,因为 col2 值为 x,我们递增 0 + 1 = 1。在第 5 行,因为 col2 值为 z,我们不递增并取以前的值,即 1 . 在"d"的情况下(第8行)。由于 col2 值不在 x 或 y 中,我们不递增并将其保留为 0。

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   x|   1|
|   a|   x|   2|
|   a|   y|   3|
|   b|   x|   1|
|   b|   z|   1|
|   c|   x|   1|
|   c|   y|   2|
|   d|   z|   0|
|   d|   x|   1|
+----+----+----+

无论如何,我可以在不使用 pyspark 中的 UDF 的情况下实现这一点

使用 window 对 col1 进行分区,然后使用条件表达式创建一个新列。

from pyspark.sql.functions import *
from pyspark.sql import Window

w = Window.partitionBy("col1").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("col3", sum(when(col("col2").isin("x", "y"), 1).otherwise(0)).over(w)).orderBy("col1").show(10)

代码的结果正是您想要的。

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   x|   1|
|   a|   x|   2|
|   a|   y|   3|
|   b|   x|   1|
|   b|   z|   1|
|   c|   x|   1|
|   c|   y|   2|
|   d|   z|   0|
|   d|   x|   1|
+----+----+----+