函数式编程方法来计算滚动总和之类的东西
Functional Programming way to calculate something like a rolling sum
假设我有一个数字列表:
val list = List(4,12,3,6,9)
对于列表中的每个元素,我需要找到滚动总和,即最终输出应该是:
List(4, 16, 19, 25, 34)
是否有任何转换允许我们将列表的两个元素(当前和上一个)作为输入并基于两者进行计算?
类似于 map(initial)((curr,prev) => curr+prev)
我想在不维护任何共享全局状态的情况下实现这一目标。
编辑:我希望能够对 RDD 进行相同类型的计算。
您可以使用scanLeft
list.scanLeft(0)(_ + _).tail
我不知道spark RDD支持什么功能,所以我不确定这是否满足你的条件,因为我不知道是否支持zipWithIndex(如果答案没有帮助,请告诉我通过评论知道我会删除我的答案):
list.zipWithIndex.map{x => list.take(x._2+1).sum}
这段代码对我有用,它总结了元素。它获取列表元素的索引,然后在列表中添加相应的前 n 个元素(注意 +1,因为 zipWithIndex 从 0 开始)。
打印时,我得到以下信息:
List(4, 16, 19, 25, 34)
下面的 cumSum
方法应该适用于任何 RDD[N]
,其中 N
有一个隐含的 Numeric[N]
可用,例如Int
、Long
、BigInt
、Double
等
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
def cumSum[N : Numeric : ClassTag](rdd: RDD[N]): RDD[N] = {
val num = implicitly[Numeric[N]]
val nPartitions = rdd.partitions.length
val partitionCumSums = rdd.mapPartitionsWithIndex((index, iter) =>
if (index == nPartitions - 1) Iterator.empty
else Iterator.single(iter.foldLeft(num.zero)(num.plus))
).collect
.scanLeft(num.zero)(num.plus)
rdd.mapPartitionsWithIndex((index, iter) =>
if (iter.isEmpty) iter
else {
val start = num.plus(partitionCumSums(index), iter.next)
iter.scanLeft(start)(num.plus)
}
)
}
将此方法推广到具有 "zero" 的任何关联二元运算符(即任何幺半群)应该相当简单。关联性是并行化的关键。如果没有这种关联性,您通常会以串行方式通过 RDD
的条目陷入 运行。
假设我有一个数字列表:
val list = List(4,12,3,6,9)
对于列表中的每个元素,我需要找到滚动总和,即最终输出应该是:
List(4, 16, 19, 25, 34)
是否有任何转换允许我们将列表的两个元素(当前和上一个)作为输入并基于两者进行计算?
类似于 map(initial)((curr,prev) => curr+prev)
我想在不维护任何共享全局状态的情况下实现这一目标。
编辑:我希望能够对 RDD 进行相同类型的计算。
您可以使用scanLeft
list.scanLeft(0)(_ + _).tail
我不知道spark RDD支持什么功能,所以我不确定这是否满足你的条件,因为我不知道是否支持zipWithIndex(如果答案没有帮助,请告诉我通过评论知道我会删除我的答案):
list.zipWithIndex.map{x => list.take(x._2+1).sum}
这段代码对我有用,它总结了元素。它获取列表元素的索引,然后在列表中添加相应的前 n 个元素(注意 +1,因为 zipWithIndex 从 0 开始)。
打印时,我得到以下信息:
List(4, 16, 19, 25, 34)
下面的 cumSum
方法应该适用于任何 RDD[N]
,其中 N
有一个隐含的 Numeric[N]
可用,例如Int
、Long
、BigInt
、Double
等
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
def cumSum[N : Numeric : ClassTag](rdd: RDD[N]): RDD[N] = {
val num = implicitly[Numeric[N]]
val nPartitions = rdd.partitions.length
val partitionCumSums = rdd.mapPartitionsWithIndex((index, iter) =>
if (index == nPartitions - 1) Iterator.empty
else Iterator.single(iter.foldLeft(num.zero)(num.plus))
).collect
.scanLeft(num.zero)(num.plus)
rdd.mapPartitionsWithIndex((index, iter) =>
if (iter.isEmpty) iter
else {
val start = num.plus(partitionCumSums(index), iter.next)
iter.scanLeft(start)(num.plus)
}
)
}
将此方法推广到具有 "zero" 的任何关联二元运算符(即任何幺半群)应该相当简单。关联性是并行化的关键。如果没有这种关联性,您通常会以串行方式通过 RDD
的条目陷入 运行。