如何将 Vector 的列或序列转换为 SparseMatrix?

How to convert a column or sequence of Vectors to a SparseMatrix?

正如标题所说,我有一个向量序列(在 DataFrame 列中,但这可以转换为 RDD 或使用 .collect() 转换为序列)。我想将这些向量收集到本地 SparseMatrix 中。对于 Spark 1.6.3 的 back-compatibility,我需要它是 SparseMatrix 的 mllib 版本。

收集为一系列 SparseVectors,我得到

val seq_of_vectors = df_with_vectors.select("sparse").map(_.getAs[SparseVector](0)).collect()
seq_of_vectors: Array[org.apache.spark.mllib.linalg.SparseVector] = ...

我可以轻松制作 RowMatrix,但我也没有看到任何将 RowMatrix 转换为本地矩阵的方法。

val exampleMatrix = new RowMatrix(df_with_vectors.select("sparse").rdd.map(_.getAs[SparseVector](0)))
exampleMatrix: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@2e6273dc

给定一个 SparseVector 对象序列,形式为

seq_of_vectors: Array[org.apache.spark.mllib.linalg.SparseVector] = 
    Array(..., (262144,[136034,155107,166596],[0.8164965809277259,0.40824829046386296,0.40824829046386296]), ...

我们使用以下方法转换为(行、列、值)的坐标列表元组:

val coo = (seq_of_vectors.map(_.numNonzeros).zipWithIndex.flatMap{case (cnt, idx) => Array.fill(cnt)(idx) },
    seq_of_vectors.map(_.indices).flatten,
    seq_of_vectors.map(_.values).flatten
).zipped.toArray

coo: Array[(Int, Int, Double)] = 
    Array( ..., (28,136034,0.8164965809277259), (28,155107,0.40824829046386296), (28,166596,0.40824829046386296), ...

然后我们使用SparseMatrixfromCOO函数。行数是传递的向量数;而列数是最长的 SparseVector 的长度:

SparseMatrix.fromCOO(seq_of_vectors.length,
    seq_of_vectors.map(_.size).max,
    coo)

res223: org.apache.spark.mllib.linalg.SparseMatrix = 
84 x 262144 CSCMatrix
...
(28,136034) 0.8164965809277259
...
(28,155107) 0.40824829046386296
...
(28,166596) 0.40824829046386296
...