如何利用稀疏矩阵优化对多个移位点的评估

How to make use of sparse matrix to optimise evaluation on many shifted points

我目前正在努力有效地解决以下问题:

我有两个向量 v1v2 以及一个向量化函数 f。对于 v1 中的每个 x,我想计算 f(x - v2) 的平均值。这个问题的特殊之处在于 f 将在许多输入上 return 零。

示例:

set.seed(0)

v1 <- rnorm(1000)
v2 <- rnorm(1000)

f <- function(x) {
  ret <- double(length(x)) + 1
  ret[abs(x) > 0.01] <- 0
  ret
}

solution_01 <- function(v1, v2, f) {
  ret <- numeric(length(v1))
  for (x in v2) {
    ret <- ret + f(v1 - x)
  }
  ret/length(v2)
}

solution_02 <- function(v1, v2, f) {
  apply(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)), 1, sum)/length(v2)
}

solution_03 <- function(v1, v2, f) {
  rowSums(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)))/length(v2)
}

solution_04 <- function(v1, v2, f) {
  rowMeans(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)))
}

s1 <- solution_01(v1, v2, f)
s2 <- solution_02(v1, v2, f)
s3 <- solution_03(v1, v2, f)
s4 <- solution_04(v1, v2, f)

all.equal(s1, s2)
all.equal(s2, s3)
all.equal(s3, s4)

bench::mark(solution_01(v1, v2, f), solution_02(v1, v2, f), solution_03(v1, v2, f), solution_04(v1, v2, f))

# Sparsity
eval_points <- outer(v1, v2, `-`)
sum(f(eval_points) == 0)/length(eval_points)

如您所见,我已经实施了四种可能的解决方案。目前 naive 解决方案(使用 for 循环)是最快的。我认为这是因为其他实现依赖于outer,这需要一些时间来分配所需的内存。

如何优化这段代码?有没有办法利用 f(outer(v1, v_2)) 的稀疏性?

您提出的问题(我很欣赏这可能是对您实际尝试做的事情的简化)将通过使用排序向量得到有效解决,尤其是当向量的长度超过 1000 个元素时。

由于您在 v1 中的值的一小段距离内计算 v2 中的值,因此您可以使用一种算法来推进第一个向量,直到它超出元素的范围在第二个考虑中,然后切换到通过另一个向量前进。这样你只需要通过每个向量一次而不是 length(v1) 次。

正如指出的那样,R 在这类事情上效率不高,如果您希望它非常快,您应该在 Rcpp 中编写整个代码。

...

我以为我会尝试编写一个有趣的算法,事实证明,即使用 R 编写,它在您的示例数据上的速度也快了将近 10 倍!

solution_05 <- function(v1, v2, f) {
  vs1 <- sort(v1)
  vs2 <- sort(v2)
  n1 <- integer(length(v1))
  i1 <- 1
  i2 <- 1
  l <- length(v1)
  for (x2 in vs2) {
    # advance i1 until v1 is in range below
    while (x2-vs1[i1] > 0.01 & i1 <= l) i1 <- i1+1
    if (i2 > i1) n1[i1:(i2-1)] <- n1[i1:(i2-1)]+1 else i2 <- i1
    # advance i2 until out of range adding 1 to n1[i2] each time
    while (vs1[i2]-x2 <= 0.01 & i2 <= l) {
      n1[i2] <- n1[i2]+1
      i2 <- i2+1
    }
  }
  s5 <- n1[rank(v1)]/length(v2)
  return(s5)
}