函数式编程方法来计算滚动总和之类的东西

Functional Programming way to calculate something like a rolling sum

假设我有一个数字列表:

val list = List(4,12,3,6,9)

对于列表中的每个元素,我需要找到滚动总和,即最终输出应该是:

List(4, 16, 19, 25, 34)

是否有任何转换允许我们将列表的两个元素(当前和上一个)作为输入并基于两者进行计算? 类似于 map(initial)((curr,prev) => curr+prev)

我想在不维护任何共享全局状态的情况下实现这一目标。

编辑:我希望能够对 RDD 进行相同类型的计算。

您可以使用scanLeft

list.scanLeft(0)(_ + _).tail

我不知道spark RDD支持什么功能,所以我不确定这是否满足你的条件,因为我不知道是否支持zipWithIndex(如果答案没有帮助,请告诉我通过评论知道我会删除我的答案):

list.zipWithIndex.map{x => list.take(x._2+1).sum}

这段代码对我有用,它总结了元素。它获取列表元素的索引,然后在列表中添加相应的前 n 个元素(注意 +1,因为 zipWithIndex 从 0 开始)。

打印时,我得到以下信息:

List(4, 16, 19, 25, 34)

下面的 cumSum 方法应该适用于任何 RDD[N],其中 N 有一个隐含的 Numeric[N] 可用,例如IntLongBigIntDouble

import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD

def cumSum[N : Numeric : ClassTag](rdd: RDD[N]): RDD[N] = {
  val num = implicitly[Numeric[N]]
  val nPartitions = rdd.partitions.length

  val partitionCumSums = rdd.mapPartitionsWithIndex((index, iter) => 
    if (index == nPartitions - 1) Iterator.empty
    else Iterator.single(iter.foldLeft(num.zero)(num.plus))
  ).collect
   .scanLeft(num.zero)(num.plus)

  rdd.mapPartitionsWithIndex((index, iter) => 
    if (iter.isEmpty) iter
    else {
      val start = num.plus(partitionCumSums(index), iter.next)
      iter.scanLeft(start)(num.plus)
    }
  )
}

将此方法推广到具有 "zero" 的任何关联二元运算符(即任何幺半群)应该相当简单。关联性是并行化的关键。如果没有这种关联性,您通常会以串行方式通过 RDD 的条目陷入 运行。