spark scala中的累积函数
Cumulative function in spark scala
我试过这个来计算累积值,但如果日期字段相同,这些值将添加到累积字段中,有人可以建议解决方案类似于 this question
val windowval = (Window.partitionBy($"userID").orderBy($"lastModified")
.rangeBetween(Window.unboundedPreceding, 0))
val df_w_cumsum = ms1_userlogRewards.withColumn("totalRewards", sum($"noOfJumps").over(windowval)).orderBy($"lastModified".asc)
df_w_cumsum.filter($"batchType".isNull).filter($"userID"==="355163").select($"userID", $"noOfJumps", $"totalRewards",$"lastModified").show()
我想你想按用户 ID 和时间戳求和。
因此,您需要按用户 ID 和日期进行分区,并使用 window 函数进行 sym,如下所示:
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("userID", "lastModified")
df.withColumn("cumulativeSum", sum(col("noOfJumps").over(window))
请注意,您的第一个 totalRewards=147
是前一个值 49
+ 时间戳为“2019-08-07 18:25:06”的所有值的总和:49 + (36 + 0 + 60 + 2
) = 147
.
第一个选项是聚合所有具有相同时间戳的值,例如groupBy($"userId", $"lastModified").agg(sum($"noOfJumps").as("noOfJumps"))
(或类似的东西)然后 运行 您的总和。这将完全删除重复的时间戳。
第二个选项是先使用 row_number 定义具有相同 lastModified
字段的行之间的顺序,然后 运行 您与 .orderBy($"lastModified, $"row_number")
的总和(或之类的东西)。这应该保留所有记录并在整个过程中为您提供部分总结:totalRewards = 49 -> 85 -> 85 -> 145 -> 147
(或类似的内容,具体取决于 row_number 定义的顺序)
我试过这个来计算累积值,但如果日期字段相同,这些值将添加到累积字段中,有人可以建议解决方案类似于 this question
val windowval = (Window.partitionBy($"userID").orderBy($"lastModified")
.rangeBetween(Window.unboundedPreceding, 0))
val df_w_cumsum = ms1_userlogRewards.withColumn("totalRewards", sum($"noOfJumps").over(windowval)).orderBy($"lastModified".asc)
df_w_cumsum.filter($"batchType".isNull).filter($"userID"==="355163").select($"userID", $"noOfJumps", $"totalRewards",$"lastModified").show()
我想你想按用户 ID 和时间戳求和。 因此,您需要按用户 ID 和日期进行分区,并使用 window 函数进行 sym,如下所示:
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("userID", "lastModified")
df.withColumn("cumulativeSum", sum(col("noOfJumps").over(window))
请注意,您的第一个 totalRewards=147
是前一个值 49
+ 时间戳为“2019-08-07 18:25:06”的所有值的总和:49 + (36 + 0 + 60 + 2
) = 147
.
第一个选项是聚合所有具有相同时间戳的值,例如groupBy($"userId", $"lastModified").agg(sum($"noOfJumps").as("noOfJumps"))
(或类似的东西)然后 运行 您的总和。这将完全删除重复的时间戳。
第二个选项是先使用 row_number 定义具有相同 lastModified
字段的行之间的顺序,然后 运行 您与 .orderBy($"lastModified, $"row_number")
的总和(或之类的东西)。这应该保留所有记录并在整个过程中为您提供部分总结:totalRewards = 49 -> 85 -> 85 -> 145 -> 147
(或类似的内容,具体取决于 row_number 定义的顺序)