PySpark 在条件下枚举分区
PySpark enumerate in partition under condition
我有一个 pySpark 数据框,例如:
class classID Property
1 1 1
1 2 0
1 3 1
1 4 1
2 1 0
2 2 0
2 3 1
现在我需要添加一个列,其中包含当前分区中有多少行的信息,直到该行确实具有 属性 == 1。像这里一样:
class classID Property relevantCount
1 1 1 1
1 2 0 1
1 3 1 2
1 4 1 3
2 1 0 0
2 2 0 0
2 3 1 1
例如我尝试了 Window 函数:
import pyspark.sql.functions as f
from pyspark.sql.window import Window
windowSpec = Window().partitionBy('class').orderBy(f.col('classID'))
df = df \
.withColumn('relevantCount',(f.when((f.col('rank') == f.lit(1)) & (f.col('Property') == f.lit(0)),0)).otherwise(f.col('Property')+f.col(f.lag('deliveryCountDesc').over(windowSpec))))
但我无法参考新行的先前值。
有没有人有更好的主意?
您的 window 规范需要修改以包含分区中所有先前的行,并使用匹配的 属性 值进行计数。
试试这个
import pyspark.sql.functions as f
from pyspark.sql.window import Window
data = [(1, 1,1),(1, 2,0),(1, 3,1),(1, 4,1),(2,1 , 0),(2, 2, 0),(2, 3, 1)]
df = spark.createDataFrame(data,['class','classID','Property'])
windowSpec = Window().partitionBy('class').orderBy('classID').rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn('relevantCount',f.count(f.when(f.col('Property') == 1, f.col('Property'))).over(windowSpec))
df.show()
结果 >
+-----+-------+--------+-------------+
|class|classID|Property|relevantCount|
+-----+-------+--------+-------------+
| 1| 1| 1| 1|
| 1| 2| 0| 1|
| 1| 3| 1| 2|
| 1| 4| 1| 3|
| 2| 1| 0| 0|
| 2| 2| 0| 0|
| 2| 3| 1| 1|
+-----+-------+--------+-------------+
我有一个 pySpark 数据框,例如:
class classID Property
1 1 1
1 2 0
1 3 1
1 4 1
2 1 0
2 2 0
2 3 1
现在我需要添加一个列,其中包含当前分区中有多少行的信息,直到该行确实具有 属性 == 1。像这里一样:
class classID Property relevantCount
1 1 1 1
1 2 0 1
1 3 1 2
1 4 1 3
2 1 0 0
2 2 0 0
2 3 1 1
例如我尝试了 Window 函数:
import pyspark.sql.functions as f
from pyspark.sql.window import Window
windowSpec = Window().partitionBy('class').orderBy(f.col('classID'))
df = df \
.withColumn('relevantCount',(f.when((f.col('rank') == f.lit(1)) & (f.col('Property') == f.lit(0)),0)).otherwise(f.col('Property')+f.col(f.lag('deliveryCountDesc').over(windowSpec))))
但我无法参考新行的先前值。
有没有人有更好的主意?
您的 window 规范需要修改以包含分区中所有先前的行,并使用匹配的 属性 值进行计数。 试试这个
import pyspark.sql.functions as f
from pyspark.sql.window import Window
data = [(1, 1,1),(1, 2,0),(1, 3,1),(1, 4,1),(2,1 , 0),(2, 2, 0),(2, 3, 1)]
df = spark.createDataFrame(data,['class','classID','Property'])
windowSpec = Window().partitionBy('class').orderBy('classID').rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn('relevantCount',f.count(f.when(f.col('Property') == 1, f.col('Property'))).over(windowSpec))
df.show()
结果 >
+-----+-------+--------+-------------+
|class|classID|Property|relevantCount|
+-----+-------+--------+-------------+
| 1| 1| 1| 1|
| 1| 2| 0| 1|
| 1| 3| 1| 2|
| 1| 4| 1| 3|
| 2| 1| 0| 0|
| 2| 2| 0| 0|
| 2| 3| 1| 1|
+-----+-------+--------+-------------+