从 RDD 的元素创建一个 SparseVector
Create a SparseVector from the elements of RDD
使用 Spark,我在 Scala 中有一个 val rdd = RDD[(x: Int, y:Int), cov:Double]
类型的数据结构,其中 RDD 的每个元素代表矩阵的一个元素,x
代表行,y
表示列和 cov
表示元素的值:
我需要从这个矩阵的行创建 SparseVectors。所以我决定首先将 rdd 转换为 RDD[x: Int, (y:Int, cov:Double)]
然后使用 groupByKey 将特定行的所有元素放在一起,如下所示:
val rdd2 = rdd.map{case ((x,y),cov) => (x, (y, cov))}.groupByKey()
现在我需要创建 SparseVectors:
val N = 7 //Vector Size
val spvec = {(x: Int,y: Iterable[(Int, Double)]) => new SparseVector(N.toLong, Array(y.map(el => el._1.toInt)), Array(y.map(el => el._2.toDouble)))}
val vecs = rdd2.map(spvec)
然而,这是弹出的错误。
type mismatch; found :Iterable[Int] required:Int
type mismatch; found :Iterable[Double] required:Double
我猜测 y.map(el => el._1.toInt)
正在返回一个无法应用数组的可迭代对象。如果有人可以帮助解决此问题,我将不胜感激。
最简单的解决方案是转换为 RowMatrix
:
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val rdd: RDD[((Int, Int), Double)] = ???
val vs: RDD[org.apache.spark.mllib.linalg.SparseVector]= new CoordinateMatrix(
rdd.map{
case ((x, y), cov) => MatrixEntry(x, y, cov)
}
).toRowMatrix.rows.map(_.toSparse)
如果您想保留行索引,您可以使用 toIndexedRowMatrix
代替:
import org.apache.spark.mllib.linalg.distributed.IndexedRow
new CoordinateMatrix(
rdd.map{
case ((x, y), cov) => MatrixEntry(x, y, cov)
}
).toIndexedRowMatrix.rows.map { case IndexedRow(i, vs) => (i, vs.toSparse) }
使用 Spark,我在 Scala 中有一个 val rdd = RDD[(x: Int, y:Int), cov:Double]
类型的数据结构,其中 RDD 的每个元素代表矩阵的一个元素,x
代表行,y
表示列和 cov
表示元素的值:
我需要从这个矩阵的行创建 SparseVectors。所以我决定首先将 rdd 转换为 RDD[x: Int, (y:Int, cov:Double)]
然后使用 groupByKey 将特定行的所有元素放在一起,如下所示:
val rdd2 = rdd.map{case ((x,y),cov) => (x, (y, cov))}.groupByKey()
现在我需要创建 SparseVectors:
val N = 7 //Vector Size
val spvec = {(x: Int,y: Iterable[(Int, Double)]) => new SparseVector(N.toLong, Array(y.map(el => el._1.toInt)), Array(y.map(el => el._2.toDouble)))}
val vecs = rdd2.map(spvec)
然而,这是弹出的错误。
type mismatch; found :Iterable[Int] required:Int
type mismatch; found :Iterable[Double] required:Double
我猜测 y.map(el => el._1.toInt)
正在返回一个无法应用数组的可迭代对象。如果有人可以帮助解决此问题,我将不胜感激。
最简单的解决方案是转换为 RowMatrix
:
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val rdd: RDD[((Int, Int), Double)] = ???
val vs: RDD[org.apache.spark.mllib.linalg.SparseVector]= new CoordinateMatrix(
rdd.map{
case ((x, y), cov) => MatrixEntry(x, y, cov)
}
).toRowMatrix.rows.map(_.toSparse)
如果您想保留行索引,您可以使用 toIndexedRowMatrix
代替:
import org.apache.spark.mllib.linalg.distributed.IndexedRow
new CoordinateMatrix(
rdd.map{
case ((x, y), cov) => MatrixEntry(x, y, cov)
}
).toIndexedRowMatrix.rows.map { case IndexedRow(i, vs) => (i, vs.toSparse) }