如何使用 Scala 添加两列稀疏向量?
How to add two columns of SparseVectors using Scala?
给定 SparseVector 对象的两个 DataFrame 列,如何将这两列相加(即向量相加)以创建新列?
类似
df.columns
df: org.apache.spark.sql.DataFrame = [v1: SparseVector, v2: SparseVector]
df.withColumn("v3", ADD_COL_FUNCTION(col(v1), col(v2)))
Spark 中没有内置 SparseVectors 的加法函数。 DenseVector
对象可以通过将它们变成数组来处理,但是对于 SparseVector
这可能是一个内存杀手。您可以将 SparseVectors 解释为地图,然后 'add' 将地图放在一起。
import org.apache.spark.mllib.linalg.{SparseVector, Vectors, Vector}
def addVecCols(v1: SparseVector, v2: SparseVector): Vector = {
val map1: Map[Int, Double] = (v1.indices zip v1.values).toMap
Vectors.sparse(v1. size,
(map1 ++ (v2.indices zip v2.values).toMap)
.map{ case (k, v) => k -> (v + map1.getOrElse(k, 0d))}
.toList
)
val addVecUDF = udf((v1: SparseVector, v2: SparseVector) => addVecCols(v1, v2))
注意在Spark 1.6中,Vectors.sparse
的return类型是Vector
,而在Spark2.X中是SparseVector
,所以调整return 类型 addVecCols
适当。此外,在 2.X 中,您可以使用 ml
库而不是 mllib
库。
在数据帧上使用它是
val outDF = inDF.withColumn("added", addVecUDF(col("one_vector"), col("other_vector")))
这是我们对这个问题的最终解决方案。
首先,我们实现了 中提供的 Spark 和 Breeze 向量之间的隐式转换(注意评论中的错误修复)。这提供了下面代码中使用的 asBreeze
和 fromBreeze
转换。
然后我们定义了一个允许添加稀疏向量列的函数:
def addVectors(v1Col: String, v2Col: String, outputCol: String)
: DataFrame => DataFrame = {
df: DataFrame => {
def add(v1: SparkVector, v2: SparkVector): SparkVector =
(v1.asBreeze + v2.asBreeze).fromBreeze
val func = udf((v1: SparkVector, v2: SparkVector) => add(v1, v2))
df.withColumn(outputCol, func(col(v1Col), col(v2Col)))
}
}
调用此函数使用:
df.transform(addVectors(col1Name, col2name, colOutName))
当然,您可能希望包括一些列名称存在的检查,并确保输出列不会覆盖您不希望覆盖的任何内容。
给定 SparseVector 对象的两个 DataFrame 列,如何将这两列相加(即向量相加)以创建新列?
类似
df.columns
df: org.apache.spark.sql.DataFrame = [v1: SparseVector, v2: SparseVector]
df.withColumn("v3", ADD_COL_FUNCTION(col(v1), col(v2)))
Spark 中没有内置 SparseVectors 的加法函数。 DenseVector
对象可以通过将它们变成数组来处理,但是对于 SparseVector
这可能是一个内存杀手。您可以将 SparseVectors 解释为地图,然后 'add' 将地图放在一起。
import org.apache.spark.mllib.linalg.{SparseVector, Vectors, Vector}
def addVecCols(v1: SparseVector, v2: SparseVector): Vector = {
val map1: Map[Int, Double] = (v1.indices zip v1.values).toMap
Vectors.sparse(v1. size,
(map1 ++ (v2.indices zip v2.values).toMap)
.map{ case (k, v) => k -> (v + map1.getOrElse(k, 0d))}
.toList
)
val addVecUDF = udf((v1: SparseVector, v2: SparseVector) => addVecCols(v1, v2))
注意在Spark 1.6中,Vectors.sparse
的return类型是Vector
,而在Spark2.X中是SparseVector
,所以调整return 类型 addVecCols
适当。此外,在 2.X 中,您可以使用 ml
库而不是 mllib
库。
在数据帧上使用它是
val outDF = inDF.withColumn("added", addVecUDF(col("one_vector"), col("other_vector")))
这是我们对这个问题的最终解决方案。
首先,我们实现了 asBreeze
和 fromBreeze
转换。
然后我们定义了一个允许添加稀疏向量列的函数:
def addVectors(v1Col: String, v2Col: String, outputCol: String)
: DataFrame => DataFrame = {
df: DataFrame => {
def add(v1: SparkVector, v2: SparkVector): SparkVector =
(v1.asBreeze + v2.asBreeze).fromBreeze
val func = udf((v1: SparkVector, v2: SparkVector) => add(v1, v2))
df.withColumn(outputCol, func(col(v1Col), col(v2Col)))
}
}
调用此函数使用:
df.transform(addVectors(col1Name, col2name, colOutName))
当然,您可能希望包括一些列名称存在的检查,并确保输出列不会覆盖您不希望覆盖的任何内容。