如何根据条件阈值pyspark计算列的累计和
How to calculate cumulative sum on a column based with condition threshold pyspark
我有一个如下所示的数据框。我想计算 V 上每个 id 的累加和,这样当前一行的累加大于或等于阈值 25 时,累加和将重置为当前值,如下图所示。
我曾尝试在 V 上编写一个用户定义的 to 运算符,但我收到一条错误消息,指出它不可迭代。我试过你滞后,但我也没有成功。
我需要帮助!
df = sqlContext.createDataFrame(
[('Mark', 0.0), ('Mark', 1), ('Mark', 1),
('Mark', 1), ('Mark', 25), ('Mark', 1),
('Mark', 1),('Mark', 1),('Mark', 20),
('Mark', 1),('Mark', 1),('Mark', 1),
('Mark', 1),('Mark', 1),('John', 0),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 9),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 7),('John', 1)],
('id', "V"))
也许有更漂亮更快的方法,这会起作用但效率不高。 Windows 使用起来很昂贵(内存)。如果你打算在生产中使用它,请小心。如果需要速度,与 window 一起使用的自定义 udf 可能会比这快一点,以避免双重 windowing:
val df = Seq(("Mark", 0), ("Mark", 1), ("Mark", 1),
("Mark", 1), ("Mark", 25), ("Mark", 1),
("Mark", 1),("Mark", 1),("Mark", 20),
("Mark", 1),("Mark", 1),("Mark", 1),
("Mark", 1),("Mark", 1),("John", 0),
("John", 1),("John", 1),("John", 1),
("John", 1),("John", 1),("John", 1),
("John", 1),("John", 9),("John", 1),
("John", 1),("John", 1),("John", 1),
("John", 1),("John", 1),("John", 1),
("John", 1),("John", 1),("John", 1),
("John", 7),("John", 1)).toDF("id","V")
val windowSpecLag = Window.partitionBy("id").orderBy("id")
val windowSpec = Window.partitionBy("id").orderBy("id").rowsBetween(Window.unboundedPreceding,Window.currentRow )
// add a running sum to the window
val divis = df.withColumn("sum",sum("V").over(windowSpec)).withColumn("divis",floor(col("sum")/25))
// shift around the numbers so the math works as desired
val lagged = divis.withColumn("clag", (lag("divis", 1, 0) over windowSpecLag) )
//re-run running total on newly partitioned data
val windowSpecFixed = Window.partitionBy("id","clag").orderBy("id","clag").rowsBetween(Window.unboundedPreceding,Window.currentRow )
lagged.withColumn("runningTotalUnder25",sum("V").over(windowSpecDivis)).show(100)
如果您想高效地执行此操作,我可能会尝试重新构建问题,以便我可以使用分组依据。或者改变数据的定义方式。
我有一个如下所示的数据框。我想计算 V 上每个 id 的累加和,这样当前一行的累加大于或等于阈值 25 时,累加和将重置为当前值,如下图所示。 我曾尝试在 V 上编写一个用户定义的 to 运算符,但我收到一条错误消息,指出它不可迭代。我试过你滞后,但我也没有成功。 我需要帮助!
df = sqlContext.createDataFrame(
[('Mark', 0.0), ('Mark', 1), ('Mark', 1),
('Mark', 1), ('Mark', 25), ('Mark', 1),
('Mark', 1),('Mark', 1),('Mark', 20),
('Mark', 1),('Mark', 1),('Mark', 1),
('Mark', 1),('Mark', 1),('John', 0),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 9),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 1),('John', 1),('John', 1),
('John', 7),('John', 1)],
('id', "V"))
也许有更漂亮更快的方法,这会起作用但效率不高。 Windows 使用起来很昂贵(内存)。如果你打算在生产中使用它,请小心。如果需要速度,与 window 一起使用的自定义 udf 可能会比这快一点,以避免双重 windowing:
val df = Seq(("Mark", 0), ("Mark", 1), ("Mark", 1),
("Mark", 1), ("Mark", 25), ("Mark", 1),
("Mark", 1),("Mark", 1),("Mark", 20),
("Mark", 1),("Mark", 1),("Mark", 1),
("Mark", 1),("Mark", 1),("John", 0),
("John", 1),("John", 1),("John", 1),
("John", 1),("John", 1),("John", 1),
("John", 1),("John", 9),("John", 1),
("John", 1),("John", 1),("John", 1),
("John", 1),("John", 1),("John", 1),
("John", 1),("John", 1),("John", 1),
("John", 7),("John", 1)).toDF("id","V")
val windowSpecLag = Window.partitionBy("id").orderBy("id")
val windowSpec = Window.partitionBy("id").orderBy("id").rowsBetween(Window.unboundedPreceding,Window.currentRow )
// add a running sum to the window
val divis = df.withColumn("sum",sum("V").over(windowSpec)).withColumn("divis",floor(col("sum")/25))
// shift around the numbers so the math works as desired
val lagged = divis.withColumn("clag", (lag("divis", 1, 0) over windowSpecLag) )
//re-run running total on newly partitioned data
val windowSpecFixed = Window.partitionBy("id","clag").orderBy("id","clag").rowsBetween(Window.unboundedPreceding,Window.currentRow )
lagged.withColumn("runningTotalUnder25",sum("V").over(windowSpecDivis)).show(100)
如果您想高效地执行此操作,我可能会尝试重新构建问题,以便我可以使用分组依据。或者改变数据的定义方式。