添加两个 RDD[mllib.linalg.Vector]'s
Addition of two RDD[mllib.linalg.Vector]'s
我需要添加存储在两个文件中的两个矩阵。
latest1.txt
和latest2.txt
的内容有下一个str:
1 2 3
4 5 6
7 8 9
我正在阅读这些文件如下:
scala> val rows = sc.textFile(“latest1.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}
scala> val r1 = rows
r1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14
scala> val rows = sc.textFile(“latest2.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}
scala> val r2 = rows
r2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14
我要添加r1,r2。那么,有什么办法可以在 Apache-Spark 中添加这两个 RDD[mllib.linalg.Vector]
。
这其实是个好问题。我经常使用 mllib,并没有意识到这些基本的线性代数运算并不容易获得。
重点是基础 breeze 向量具有您所期望的所有线性代数操作 - 当然包括您特别提到的基本元素明智的加法。
然而,breeze 实现通过以下方式对外界隐藏:
[private mllib]
那么,从外部 world/public API 的角度来看,我们如何访问这些原语?
其中一些已经公开:例如平方和:
/**
* Returns the squared distance between two Vectors.
* @param v1 first Vector.
* @param v2 second Vector.
* @return squared distance between two Vectors.
*/
def sqdist(v1: Vector, v2: Vector): Double = {
...
}
然而,此类可用方法的选择是有限的 - 实际上 不 包括基本运算,包括元素明智的加法、减法、乘法等。
所以这是我能看到的最好的:
- 将向量转换为 breeze:
- 在breeze
中执行矢量运算
- 从 breeze 转换回 mllib Vector
下面是一些示例代码:
val v1 = Vectors.dense(1.0, 2.0, 3.0)
val v2 = Vectors.dense(4.0, 5.0, 6.0)
val bv1 = new DenseVector(v1.toArray)
val bv2 = new DenseVector(v2.toArray)
val vectout = Vectors.dense((bv1 + bv2).toArray)
vectout: org.apache.spark.mllib.linalg.Vector = [5.0,7.0,9.0]
以下代码公开了 Spark 中的 asBreeze 和 fromBreeze 方法。此解决方案支持 SparseVector
与使用 vector.toArray
相比。请注意,Spark 将来可能会更改其 API,并且已经将 toBreeze
重命名为 asBreeze
。
package org.apache.spark.mllib.linalg
import breeze.linalg.{Vector => BV}
import org.apache.spark.sql.functions.udf
/** expose vector.toBreeze and Vectors.fromBreeze
*/
object VectorUtils {
def fromBreeze(breezeVector: BV[Double]): Vector = {
Vectors.fromBreeze( breezeVector )
}
def asBreeze(vector: Vector): BV[Double] = {
// this is vector.asBreeze in Spark 2.0
vector.toBreeze
}
val addVectors = udf {
(v1: Vector, v2: Vector) => fromBreeze( asBreeze(v1) + asBreeze(v2) )
}
}
有了这个你可以做到df.withColumn("xy", addVectors($"x", $"y"))
。
我需要添加存储在两个文件中的两个矩阵。
latest1.txt
和latest2.txt
的内容有下一个str:
1 2 3 4 5 6 7 8 9
我正在阅读这些文件如下:
scala> val rows = sc.textFile(“latest1.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}
scala> val r1 = rows
r1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14
scala> val rows = sc.textFile(“latest2.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}
scala> val r2 = rows
r2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14
我要添加r1,r2。那么,有什么办法可以在 Apache-Spark 中添加这两个 RDD[mllib.linalg.Vector]
。
这其实是个好问题。我经常使用 mllib,并没有意识到这些基本的线性代数运算并不容易获得。
重点是基础 breeze 向量具有您所期望的所有线性代数操作 - 当然包括您特别提到的基本元素明智的加法。
然而,breeze 实现通过以下方式对外界隐藏:
[private mllib]
那么,从外部 world/public API 的角度来看,我们如何访问这些原语?
其中一些已经公开:例如平方和:
/**
* Returns the squared distance between two Vectors.
* @param v1 first Vector.
* @param v2 second Vector.
* @return squared distance between two Vectors.
*/
def sqdist(v1: Vector, v2: Vector): Double = {
...
}
然而,此类可用方法的选择是有限的 - 实际上 不 包括基本运算,包括元素明智的加法、减法、乘法等。
所以这是我能看到的最好的:
- 将向量转换为 breeze:
- 在breeze 中执行矢量运算
- 从 breeze 转换回 mllib Vector
下面是一些示例代码:
val v1 = Vectors.dense(1.0, 2.0, 3.0)
val v2 = Vectors.dense(4.0, 5.0, 6.0)
val bv1 = new DenseVector(v1.toArray)
val bv2 = new DenseVector(v2.toArray)
val vectout = Vectors.dense((bv1 + bv2).toArray)
vectout: org.apache.spark.mllib.linalg.Vector = [5.0,7.0,9.0]
以下代码公开了 Spark 中的 asBreeze 和 fromBreeze 方法。此解决方案支持 SparseVector
与使用 vector.toArray
相比。请注意,Spark 将来可能会更改其 API,并且已经将 toBreeze
重命名为 asBreeze
。
package org.apache.spark.mllib.linalg
import breeze.linalg.{Vector => BV}
import org.apache.spark.sql.functions.udf
/** expose vector.toBreeze and Vectors.fromBreeze
*/
object VectorUtils {
def fromBreeze(breezeVector: BV[Double]): Vector = {
Vectors.fromBreeze( breezeVector )
}
def asBreeze(vector: Vector): BV[Double] = {
// this is vector.asBreeze in Spark 2.0
vector.toBreeze
}
val addVectors = udf {
(v1: Vector, v2: Vector) => fromBreeze( asBreeze(v1) + asBreeze(v2) )
}
}
有了这个你可以做到df.withColumn("xy", addVectors($"x", $"y"))
。