如何根据条件阈值pyspark计算列的累计和

How to calculate cumulative sum on a column based with condition threshold pyspark

我有一个如下所示的数据框。我想计算 V 上每个 id 的累加和,这样当前一行的累加大于或等于阈值 25 时,累加和将重置为当前值,如下图所示。 我曾尝试在 V 上编写一个用户定义的 to 运算符,但我收到一条错误消息,指出它不可迭代。我试过你滞后,但我也没有成功。 我需要帮助!

df = sqlContext.createDataFrame(
[('Mark', 0.0), ('Mark', 1), ('Mark', 1),
('Mark', 1), ('Mark', 25), ('Mark', 1),
('Mark', 1),('Mark', 1),('Mark', 20),
('Mark', 1),('Mark', 1),('Mark', 1),
('Mark', 1),('Mark', 1),('John', 0),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 9),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 7),('John', 1)], 
('id', "V"))

也许有更漂亮更快的方法,这会起作用但效率不高。 Windows 使用起来很昂贵(内存)。如果你打算在生产中使用它,请小心。如果需要速度,与 window 一起使用的自定义 udf 可能会比这快一点,以避免双重 windowing:

    val df = Seq(("Mark", 0), ("Mark", 1), ("Mark", 1),
    ("Mark", 1), ("Mark", 25), ("Mark", 1),
    ("Mark", 1),("Mark", 1),("Mark", 20),
    ("Mark", 1),("Mark", 1),("Mark", 1),
    ("Mark", 1),("Mark", 1),("John", 0),
    ("John", 1),("John", 1),("John", 1),
    ("John", 1),("John", 1),("John", 1),
    ("John", 1),("John", 9),("John", 1),
    ("John", 1),("John", 1),("John", 1),
    ("John", 1),("John", 1),("John", 1),
    ("John", 1),("John", 1),("John", 1),
    ("John", 7),("John", 1)).toDF("id","V")
    val windowSpecLag = Window.partitionBy("id").orderBy("id")
    val windowSpec = Window.partitionBy("id").orderBy("id").rowsBetween(Window.unboundedPreceding,Window.currentRow )
    // add a running sum to the window
    val divis = df.withColumn("sum",sum("V").over(windowSpec)).withColumn("divis",floor(col("sum")/25))
    // shift around the numbers so the math works as desired
    val lagged = divis.withColumn("clag", (lag("divis", 1, 0) over windowSpecLag) )
    //re-run running total on newly partitioned data
val windowSpecFixed = Window.partitionBy("id","clag").orderBy("id","clag").rowsBetween(Window.unboundedPreceding,Window.currentRow )
lagged.withColumn("runningTotalUnder25",sum("V").over(windowSpecDivis)).show(100)

如果您想高效地执行此操作,我可能会尝试重新构建问题,以便我可以使用分组依据。或者改变数据的定义方式。