如何计算 Apache Spark 中两个分布式 RowMatrix 的点积?

How to compute the dot product of two distributed RowMatrix in Apache Spark?

Q成为Spark中的分布式行矩阵,我想计算Q[=49的叉积=] 及其转置 Q'.

不过行矩阵虽然有multiply()方法,但它只能接受局部矩阵作为参数。

代码说明(Scala):

val phi = new RowMatrix(phiRDD)            // phiRDD is an instance of RDD[Vector]
val phiTranspose = transposeRowMatrix(phi) // transposeRowMatrix()
                                           // returns the transpose of a RowMatrix
val crossMat = ?                           // phi * phiTranspose

请注意,我想执行 2 分布式 RowMatrix 的点积,而不是分布式行矩阵与本地行矩阵的点积。

一种解决方案是使用 IndexedRowMatrix,如下所示:

val phi = new IndexedRowMatrix(phiRDD)  // phiRDD is an instance of RDD[IndexedRow]
val phiTranspose = transposeMatrix(phi) // transposeMatrix()
                                        // returns the transpose of a Matrix
val crossMat = phi.toBlockMatrix().multiply( phiTranspose.toBlockMatrix()
                                             ).toIndexedRowMatrix()

但是,我想使用 tallSkinnyQR() 等行矩阵方法,这意味着我需要将 crossMat 转换为 RowMatrix,使用.toRowMatrix()方法:

val crossRowMat = crossMat.toRowMatrix()

终于可以申请了

crossRowMat.tallSkinnyQR()

但是这个过程包括分布式矩阵类型之间的许多转换,根据我从 MLlib Programming Guide 的理解,这是昂贵的:

It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive.

请详细说明一下。

只有支持矩阵-矩阵乘法的分布式矩阵是BlockMatrices。您必须相应地转换数据 - 人工索引就足够了:

new IndexedRowMatrix(
  rowMatrix.rows.zipWithIndex.map(x => IndexedRow(x._2,  x._1))
).toBlockMatrix match { case m => m.multiply(m.transpose) }

我使用了此 page 中列出的算法,它通过使用向量外积将乘法问题从点积问题转移到分布式标量积问题:

The outer product between two vectors is the scalar product of the second vector with all the elements in the first vector, resulting in a matrix

我自己为行矩阵创建的乘法函数(可以更优化)就这样结束了。

def multiplyRowMatrices(m1: RowMatrix, m2: RowMatrix)(implicit ctx: SparkSession): RowMatrix = {

 // Zip m1 columns with m2 rows
val m1Cm2R = transposeRowMatrix(m1).rows.zip(m2.rows)

// Apply scalar product between each entry in m1 vector with m2 row
val scalar = m1Cm2R.map{
case(column:DenseVector,row:DenseVector) => column.toArray.map{
  columnValue => row.toArray.map{
    rowValue => columnValue*rowValue
  }
 }
}

// Add all the resulting matrices point wisely
val sum = scalar.reduce{
case(matrix1,matrix2) => matrix1.zip(matrix2).map{
  case(array1,array2)=> array1.zip(array2).map{
    case(value1,value2)=> value1+value2
  }
 }
}

new RowMatrix(ctx.sparkContext.parallelize(sum.map(array=> Vectors.dense(array))))
}

之后我测试了两种方法——我自己的函数和使用块矩阵——在一台机器上使用 300*10 矩阵

使用我自己的函数:

val PhiMat = new RowMatrix(phi)
val TphiMat = transposeRowMatrix(PhiMat)
val product = multiplyRowMatrices(PhiMat,TphiMat)

使用矩阵变换:

val MatRow = new RowMatrix(phi)
val MatBlock = new IndexedRowMatrix(MatRow.rows.zipWithIndex.map(x => IndexedRow(x._2,  x._1))).toBlockMatrix()
val TMatBlock = MatBlock.transpose
val productMatBlock = MatBlock.multiply(TMatBlock)
val productMatRow = productMatBlock.toIndexedRowMatrix().toRowMatrix()

第一种方法跨越了 1 个作业 5 个阶段,并用了 2 秒 完成全部的。第二种方法跨越了 4 个作业三个一个阶段一个两个阶段,并采用总共 0.323s。此外,第二种方法在 Shuffle Read/Write 大小方面优于第一种方法。

但我仍然对MLlib Programming指南声明感到困惑:

It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive.