在准备两个 SparseVectors 的逐元素乘法时避免使用 ListBuffer

Avoid ListBuffer while preparing an element-wise multiplication of two SparseVectors

我正在尝试实现两个 ml.linalg.SparseVector 实例的逐元素乘法(也称为 Hadamard 乘积)。

A SparseVector 表示一个向量,但不是让 space 被所有“0”值占用,而是将它们省略。该向量表示为两个索引和值列表。

例如:SparseVector(indices: [0, 100, 100000], values: [0.25, 1, 0.8]) 简明地表示一个包含 100,000 个元素的数组,其中只有 3 个值是非零值。

我现在需要其中两个的逐元素乘法,但似乎没有内置的。从概念上讲,它应该很简单——它们没有共同点的任何索引都被删除,对于共同的索引,数字相乘。

例如:SparseVector(indices: [0, 500, 100000], values: [10, 1, 10])与上面相乘时应return:SparseVector(indices: [0, 100000], values: [2.5, 8])

遗憾的是,我没有找到内置的。我有一种一次性完成此操作的方法,但它不是很简单,它必须在循环中建立列表,因为它会发现哪些索引是共同的,然后为每个索引获取相应的值(具有相同的基本位置,但在第二个数组中)。

import org.apache.spark.ml.linalg._
import org.apache.spark.sql.functions.udf
import scala.collection.mutable.ListBuffer

// Return a new SparseVector whose values are the element-wise product (Hadamard product)
val multSparseVectors = udf((v1: SparseVector, v2: SparseVector) => {
  // val commonIndexes = v1.indices.intersect(v2.indices); // Missing scale factors are assumed to have a value of 0, so only common elements remain
  // TODO: No clear way to map common indices to the values that go with those indices. E.g. no "valueForIndex" method
  // new SparseVector(v1.size, commonIndexes, commonIndexes.map(i => v1.valueForIndex(i) * v2.valueForIndex(i)).toArray);
  
  val indices = ListBuffer[Int](); // TODO: Some way to do this without mutable lists?
  val values = ListBuffer[Double]();
  var v1Pos = 0; // Current index of SparseVector v1 (we will be making a single pass)
  var v2pos = 0; // Current index of SparseVector v2 (we will be making a single pass)
  while(v1Pos < v1.indices.length && v2pos < v2.indices.length) {
    while(v1.indices(v1Pos) < v2.indices(v2pos))
      v2pos += 1; // Advance our position in SparseVector 2 until we've matched or passed the current SparseVector 1 index
    if(v2pos > v2.indices.length && v1.indices(v1Pos) == v2.indices(v2pos)) {
      indices += v1.indices(v1Pos);
      values += v1.values(v1Pos) * v2.values(v2pos);
    }
    v1Pos += 1;
  }
  new SparseVector(v1.size, indices.toArray, values.toArray);
})
spark.udf.register("multSparseVectors", multSparseVectors)

谁能想出一种使用地图或类似工具来完成此操作的方法?我的主要目标是我想避免在第二个向量上进行多次 O(N) 传递以“查找”indices 列表中某个值的位置,以便我可以获取相应的 values条目,因为当我知道可能有 O(K + N) 的解决方案时,这将花费 O(K + N*2) 的时间。

我想出了一个解决方案,将这个问题转化为一个更一般的问题:

给出上述问题的答案(两个数组 v1.indicesv2.indices 相交的位置),我们可以简单地使用这些索引提取新的 SparseVector 索引,以及来自每个向量相乘。

解决方法如下:

%scala
import scala.annotation.tailrec
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.functions.udf

// This fanciness from  finds the indices at which two lists intersect
@tailrec
def indicesOfIntersection(left: List[Int], right: List[Int], lidx: Int = 0, ridx: Int = 0, result: List[(Int, Int)] = Nil): List[(Int, Int)] = (left, right) match { 
    case (Nil, _) | (_, Nil) => result.reverse
    case (l::tail, r::_) if l < r => indicesOfIntersection(tail, right, lidx+1, ridx, result)
    case (l::_, r::tail) if l > r => indicesOfIntersection(left, tail, lidx, ridx+1,  result)
    case (l::ltail, r::rtail) => indicesOfIntersection(ltail, rtail, lidx+1, ridx+1, (lidx, ridx) :: result)
}

// Return a new SparseVector whose values are the element-wise product (Hadamard product)
val multSparseVectors = udf((v1: SparseVector, v2: SparseVector) => {
  val intersection = indicesOfIntersection(v1.indices.toList, v2.indices.toList);
  new SparseVector(v1.size,
    intersection.map{case (x1,_) => v1.indices(x1)}.toArray,
    intersection.map{case (x1,x2) => v1.values(x1) * v2.values(x2)}.toArray);
})
spark.udf.register("multSparseVectors", multSparseVectors)