如何利用稀疏矩阵优化对多个移位点的评估
How to make use of sparse matrix to optimise evaluation on many shifted points
我目前正在努力有效地解决以下问题:
我有两个向量 v1
、v2
以及一个向量化函数 f
。对于 v1
中的每个 x
,我想计算 f(x - v2)
的平均值。这个问题的特殊之处在于 f
将在许多输入上 return 零。
示例:
set.seed(0)
v1 <- rnorm(1000)
v2 <- rnorm(1000)
f <- function(x) {
ret <- double(length(x)) + 1
ret[abs(x) > 0.01] <- 0
ret
}
solution_01 <- function(v1, v2, f) {
ret <- numeric(length(v1))
for (x in v2) {
ret <- ret + f(v1 - x)
}
ret/length(v2)
}
solution_02 <- function(v1, v2, f) {
apply(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)), 1, sum)/length(v2)
}
solution_03 <- function(v1, v2, f) {
rowSums(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)))/length(v2)
}
solution_04 <- function(v1, v2, f) {
rowMeans(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)))
}
s1 <- solution_01(v1, v2, f)
s2 <- solution_02(v1, v2, f)
s3 <- solution_03(v1, v2, f)
s4 <- solution_04(v1, v2, f)
all.equal(s1, s2)
all.equal(s2, s3)
all.equal(s3, s4)
bench::mark(solution_01(v1, v2, f), solution_02(v1, v2, f), solution_03(v1, v2, f), solution_04(v1, v2, f))
# Sparsity
eval_points <- outer(v1, v2, `-`)
sum(f(eval_points) == 0)/length(eval_points)
如您所见,我已经实施了四种可能的解决方案。目前 naive 解决方案(使用 for 循环)是最快的。我认为这是因为其他实现依赖于outer
,这需要一些时间来分配所需的内存。
如何优化这段代码?有没有办法利用 f(outer(v1, v_2))
的稀疏性?
您提出的问题(我很欣赏这可能是对您实际尝试做的事情的简化)将通过使用排序向量得到有效解决,尤其是当向量的长度超过 1000 个元素时。
由于您在 v1
中的值的一小段距离内计算 v2
中的值,因此您可以使用一种算法来推进第一个向量,直到它超出元素的范围在第二个考虑中,然后切换到通过另一个向量前进。这样你只需要通过每个向量一次而不是 length(v1)
次。
正如指出的那样,R 在这类事情上效率不高,如果您希望它非常快,您应该在 Rcpp 中编写整个代码。
...
我以为我会尝试编写一个有趣的算法,事实证明,即使用 R 编写,它在您的示例数据上的速度也快了将近 10 倍!
solution_05 <- function(v1, v2, f) {
vs1 <- sort(v1)
vs2 <- sort(v2)
n1 <- integer(length(v1))
i1 <- 1
i2 <- 1
l <- length(v1)
for (x2 in vs2) {
# advance i1 until v1 is in range below
while (x2-vs1[i1] > 0.01 & i1 <= l) i1 <- i1+1
if (i2 > i1) n1[i1:(i2-1)] <- n1[i1:(i2-1)]+1 else i2 <- i1
# advance i2 until out of range adding 1 to n1[i2] each time
while (vs1[i2]-x2 <= 0.01 & i2 <= l) {
n1[i2] <- n1[i2]+1
i2 <- i2+1
}
}
s5 <- n1[rank(v1)]/length(v2)
return(s5)
}
我目前正在努力有效地解决以下问题:
我有两个向量 v1
、v2
以及一个向量化函数 f
。对于 v1
中的每个 x
,我想计算 f(x - v2)
的平均值。这个问题的特殊之处在于 f
将在许多输入上 return 零。
示例:
set.seed(0)
v1 <- rnorm(1000)
v2 <- rnorm(1000)
f <- function(x) {
ret <- double(length(x)) + 1
ret[abs(x) > 0.01] <- 0
ret
}
solution_01 <- function(v1, v2, f) {
ret <- numeric(length(v1))
for (x in v2) {
ret <- ret + f(v1 - x)
}
ret/length(v2)
}
solution_02 <- function(v1, v2, f) {
apply(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)), 1, sum)/length(v2)
}
solution_03 <- function(v1, v2, f) {
rowSums(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)))/length(v2)
}
solution_04 <- function(v1, v2, f) {
rowMeans(matrix(f(outer(v1, v2, `-`)), nrow=length(v1)))
}
s1 <- solution_01(v1, v2, f)
s2 <- solution_02(v1, v2, f)
s3 <- solution_03(v1, v2, f)
s4 <- solution_04(v1, v2, f)
all.equal(s1, s2)
all.equal(s2, s3)
all.equal(s3, s4)
bench::mark(solution_01(v1, v2, f), solution_02(v1, v2, f), solution_03(v1, v2, f), solution_04(v1, v2, f))
# Sparsity
eval_points <- outer(v1, v2, `-`)
sum(f(eval_points) == 0)/length(eval_points)
如您所见,我已经实施了四种可能的解决方案。目前 naive 解决方案(使用 for 循环)是最快的。我认为这是因为其他实现依赖于outer
,这需要一些时间来分配所需的内存。
如何优化这段代码?有没有办法利用 f(outer(v1, v_2))
的稀疏性?
您提出的问题(我很欣赏这可能是对您实际尝试做的事情的简化)将通过使用排序向量得到有效解决,尤其是当向量的长度超过 1000 个元素时。
由于您在 v1
中的值的一小段距离内计算 v2
中的值,因此您可以使用一种算法来推进第一个向量,直到它超出元素的范围在第二个考虑中,然后切换到通过另一个向量前进。这样你只需要通过每个向量一次而不是 length(v1)
次。
正如指出的那样,R 在这类事情上效率不高,如果您希望它非常快,您应该在 Rcpp 中编写整个代码。
...
我以为我会尝试编写一个有趣的算法,事实证明,即使用 R 编写,它在您的示例数据上的速度也快了将近 10 倍!
solution_05 <- function(v1, v2, f) {
vs1 <- sort(v1)
vs2 <- sort(v2)
n1 <- integer(length(v1))
i1 <- 1
i2 <- 1
l <- length(v1)
for (x2 in vs2) {
# advance i1 until v1 is in range below
while (x2-vs1[i1] > 0.01 & i1 <= l) i1 <- i1+1
if (i2 > i1) n1[i1:(i2-1)] <- n1[i1:(i2-1)]+1 else i2 <- i1
# advance i2 until out of range adding 1 to n1[i2] each time
while (vs1[i2]-x2 <= 0.01 & i2 <= l) {
n1[i2] <- n1[i2]+1
i2 <- i2+1
}
}
s5 <- n1[rank(v1)]/length(v2)
return(s5)
}