有没有更好的方法来减少 RDD[Array[Double]] 上的操作

Is there a better way for reduce operation on RDD[Array[Double]]

我想减少 RDD[Array[Double]] 以便数组的每个元素都将与下一个数组的相同元素相加。 我暂时使用这段代码:

var rdd1 = RDD[Array[Double]]

var coord = rdd1.reduce( (x,y) => { (x, y).zipped.map(_+_) })

有没有更好的方法来提高效率,因为它会造成伤害。

使用 zipped.map 非常低效,因为它会创建大量临时对象并将双打装箱。

如果你使用spire,你可以这样做

> import spire.implicits._
> val rdd1 = sc.parallelize(Seq(Array(1.0, 2.0), Array(3.0, 4.0)))
> var coord = rdd1.reduce( _ + _)
res1: Array[Double] = Array(4.0, 6.0)

这看起来更好看,而且应该更有效率。

Spire 是 spark 的依赖项,因此您应该能够在没有任何额外依赖项的情况下执行上述操作。至少它与这里的 spark 1.3.1 的 spark-shell 一起工作。

这适用于元素类型可用的 AdditiveSemigroup 类型类实例的任何数组。在本例中,元素类型为 Double。 Spire typeclasses @specialized for double,因此不会在任何地方进行装箱。

如果你真的想知道是怎么回事让这个工作,你必须使用 reify:

> import scala.reflect.runtime.{universe => u}
> val a = Array(1.0, 2.0)
> val b = Array(3.0, 4.0)
> u.reify { a + b }

res5: reflect.runtime.universe.Expr[Array[Double]] = Expr[scala.Array[Double]](
  implicits.additiveSemigroupOps(a)(
    implicits.ArrayNormedVectorSpace(
      implicits.DoubleAlgebra, 
      implicits.DoubleAlgebra,
      Predef.this.implicitly)).$plus(b))

所以加法有效,因为 Array[Double] 有一个 AdditiveSemigroup 实例。

我假设担心的是您有非常大的 Array[Double] 并且所写的转换不会分配它们的加法。如果是这样,你可以做类似(未经测试)的事情:

// map Array[Double] to (index, double)
val rdd2 = rdd1.flatMap(a => a.zipWithIndex.map(t => (t._2,t._1))
// get the sum for each index
val reduced = rdd2.reduceByKey(_ + _)
// key everything the same to get a single iterable in groubByKey
val groupAll = reduced.map(t => ("constKey", (t._1, t._2)
// get the doubles back together into an array
val coord = groupAll.groupByKey { (k,vs) => 
                     vs.toList.sortBy(_._1).toArray.map(_._2) }