如何在 spark scala 的 Breeze 矩阵数组中找到相同单元格的平均值?

How to find the mean of same cells in an array of Breeze Matrices in spark scala?

我有一个 Array[DenseMatrix[Double]],我想找到相同单元格的平均值。例如:

Array[0]: 
  +---+---+
  | 1 | 2 |
  +---+---+ 
  | 2 | 3 |
  +---+---+

Array[1]: 
  +---+---+
  | 1 | 1 |
  +---+---+ 
  | 3 | 1 |
  +---+---+

Array[2]:
  +---+---+
  | 2 | 3 |
  +---+---+ 
  | 4 | 1 |
  +---+---+

Result: DenseMatrix: 
  +----+----+
  | 1.3|  2 |
  +----+----+ 
  |  3 | 1.6|
  +----+----+

这不是 RDD,因为我希望此代码 运行 在驱动程序上。

Spark Scala 对我来说是新手,我能想到的是:

  val ar = rdd.collect().foreach(x=> {
    val matr = DenseMatrix.zeros[Double](C,2)
    matr := x/M
    matr
  })

但是不知道对不对,因为它认为是closure。此外,它需要 DenseMatrix[Double] return 类型,但我收到错误,因为如果 RDD 为空,我就没有。有什么想法吗?

你可以这样使用fold

val rdd    = sc.makeRDD(Seq(1, 2, 3))
val zero   = 0
val sum    = rdd.fold(zero)((l, r) => l + r) // = (((0 + 1) + 2) + 3)
val result = sum / rdd.count()

使用 breeze 矩阵 时,您可以使用 + 对两个不同的矩阵进行逐元素相加。这意味着您唯一需要做的就是将所有矩阵加在一起,然后除以矩阵的数量。可以这样操作:

import breeze.linalg.DenseMatrix

val arr = Array(new DenseMatrix(2, 2, Array(1.0,2,2,3)), 
        new DenseMatrix(2, 2, Array(1.0,3,1,1)),
        new DenseMatrix(2, 2, Array(2.0,4,3,1)))

val dm: DenseMatrix = arr.reduce(_ + _).map(_ / arr.length)

生成的矩阵将具有相同单元格的平均值。


这在使用 Sparkml.linalg.DenseMatrix 矩阵时也是可能的,但是,它有点复杂,因为那里不是简单的加法。

val numCols = arr.head.numCols
val numRows = arr.head.numRows
val values = arr.map(_.values)
  .reduce((_, _).zipped.map(_ + _))
  .map(_ / arr.length)

val dm = new DenseMatrix(numCols, numRows, values)