如何使用 Scala 添加两列稀疏向量?

How to add two columns of SparseVectors using Scala?

给定 SparseVector 对象的两个 DataFrame 列,如何将这两列相加(即向量相加)以创建新列?

类似

df.columns
df: org.apache.spark.sql.DataFrame = [v1: SparseVector, v2: SparseVector]

df.withColumn("v3", ADD_COL_FUNCTION(col(v1), col(v2)))

Spark 中没有内置 SparseVectors 的加法函数。 DenseVector 对象可以通过将它们变成数组来处理,但是对于 SparseVector 这可能是一个内存杀手。您可以将 SparseVectors 解释为地图,然后 'add' 将地图放在一起。

import org.apache.spark.mllib.linalg.{SparseVector, Vectors, Vector}

def addVecCols(v1: SparseVector, v2: SparseVector): Vector = {
  val map1: Map[Int, Double] = (v1.indices zip v1.values).toMap

  Vectors.sparse(v1. size, 
    (map1 ++ (v2.indices zip v2.values).toMap)
      .map{ case (k, v) => k -> (v + map1.getOrElse(k, 0d))}
      .toList
  )


val addVecUDF = udf((v1: SparseVector, v2: SparseVector) => addVecCols(v1, v2))

注意在Spark 1.6中,Vectors.sparse的return类型是Vector,而在Spark2.X中是SparseVector,所以调整return 类型 addVecCols 适当。此外,在 2.X 中,您可以使用 ml 库而不是 mllib 库。

在数据帧上使用它是

val outDF = inDF.withColumn("added", addVecUDF(col("one_vector"), col("other_vector")))

这是我们对这个问题的最终解决方案。

首先,我们实现了 中提供的 Spark 和 Breeze 向量之间的隐式转换(注意评论中的错误修复)。这提供了下面代码中使用的 asBreezefromBreeze 转换。

然后我们定义了一个允许添加稀疏向量列的函数:

def addVectors(v1Col: String, v2Col: String, outputCol: String)
            : DataFrame => DataFrame = {
  df: DataFrame => {
    def add(v1: SparkVector, v2: SparkVector): SparkVector =
      (v1.asBreeze + v2.asBreeze).fromBreeze
    val func = udf((v1: SparkVector, v2: SparkVector) => add(v1, v2))
    df.withColumn(outputCol, func(col(v1Col), col(v2Col)))
  }
}

调用此函数使用:

 df.transform(addVectors(col1Name, col2name, colOutName))

当然,您可能希望包括一些列名称存在的检查,并确保输出列不会覆盖您不希望覆盖的任何内容。