bigstatsr 的 FBM() 在使用并行 foreach 时无法正确计算矩阵,就像在简单的 for 循环中代码为 运行 时一样
FBM() of bigstatsr does not calculate the matrix correctly while using parallel foreach as it does when the code is running in a simple for loop
我需要估计一个转换矩阵。因为我有很多数据,所以我尝试 运行 使用 foreach
并行处理它,并且我尝试了 bigstatsr
的共享内存功能 FBM()
。而且似乎该函数并不总是 return 正确的结果。 (有时会。)会不会是功能不正常?
下面是代码正确运行的示例:
x <- c(1,2,1,1,3,4,4,1,2,4,1,4,3,4,4,4,3,1,3,2,3,3,3,4,2,2,3)
n <- length(unique(x))
A <- matrix(nrow = n, ncol = n, 0)
for (t in 1:(length(x) - 1)) {A[x[t], x[t + 1]] <- A[x[t], x[t + 1]] + 1}
A
这里的代码并不总是 return 正确的结果:
library(foreach)
library(doParallel)
library(bigstatsr)
cl <- makeCluster(8)
registerDoParallel(cl)
B <- FBM(n, n)
set.seed(3)
foreach (t = 1:(length(x) - 1)) %dopar% {B[x[t], x[t + 1]] <- B[x[t], x[t + 1]] + 1}
stopCluster(cl)
B[]
identical(A,B[])
使用 snow
库时也会发生同样的情况
library(snow)
library(bigstatsr)
cl <- makeCluster(8)
f.trans.m <- function(t) {
D[x[t], x[t + 1]] <<- D[x[t], x[t + 1]] + 1
}
D <- FBM(n, n)
clusterExport(cl, "f.trans.m")
clusterExport(cl, "D")
clusterExport(cl, "x")
parLapply(cl, seq(1,(length(x) - 1)), function(t) f.trans.m(t))
D[]
identical(A,D[])
我是否正确使用了软件包,还是 FBM()
中存在错误?
一个解决方案:
缺少包 flock
.
提供的文件锁
B <- FBM(n, n)
lock <- tempfile()
foreach (t = 1:(length(x) - 1)) %dopar% {
locked <- flock::lock(lock)
B[x[t], x[t + 1]] <- B[x[t], x[t + 1]] + 1
flock::unlock(locked)
}
对于这个特定示例,问题在于值的并行并发更新(参见 https://privefl.github.io/blog/a-guide-to-parallelism-in-r/#advanced-parallelism-synchronization)。
在这里,我根本不会使用并行。我宁愿选择顺序(但矢量化)访问器。
我会首先重新组合索引以递增:
library(dplyr)
ind <- data.frame(i = x[-length(x)], j = x[-1]) %>%
group_by(i, j) %>%
count()
然后,我将使用两列矩阵访问器来更新相应的值,而无需使用 R 循环。
B <- FBM(n, n, init = 0)
ind2 <- as.matrix(ind[1:2])
B[ind2] <- B[ind2] + ind[[3]]
我需要估计一个转换矩阵。因为我有很多数据,所以我尝试 运行 使用 foreach
并行处理它,并且我尝试了 bigstatsr
的共享内存功能 FBM()
。而且似乎该函数并不总是 return 正确的结果。 (有时会。)会不会是功能不正常?
下面是代码正确运行的示例:
x <- c(1,2,1,1,3,4,4,1,2,4,1,4,3,4,4,4,3,1,3,2,3,3,3,4,2,2,3)
n <- length(unique(x))
A <- matrix(nrow = n, ncol = n, 0)
for (t in 1:(length(x) - 1)) {A[x[t], x[t + 1]] <- A[x[t], x[t + 1]] + 1}
A
这里的代码并不总是 return 正确的结果:
library(foreach)
library(doParallel)
library(bigstatsr)
cl <- makeCluster(8)
registerDoParallel(cl)
B <- FBM(n, n)
set.seed(3)
foreach (t = 1:(length(x) - 1)) %dopar% {B[x[t], x[t + 1]] <- B[x[t], x[t + 1]] + 1}
stopCluster(cl)
B[]
identical(A,B[])
使用 snow
库时也会发生同样的情况
library(snow)
library(bigstatsr)
cl <- makeCluster(8)
f.trans.m <- function(t) {
D[x[t], x[t + 1]] <<- D[x[t], x[t + 1]] + 1
}
D <- FBM(n, n)
clusterExport(cl, "f.trans.m")
clusterExport(cl, "D")
clusterExport(cl, "x")
parLapply(cl, seq(1,(length(x) - 1)), function(t) f.trans.m(t))
D[]
identical(A,D[])
我是否正确使用了软件包,还是 FBM()
中存在错误?
一个解决方案:
缺少包 flock
.
B <- FBM(n, n)
lock <- tempfile()
foreach (t = 1:(length(x) - 1)) %dopar% {
locked <- flock::lock(lock)
B[x[t], x[t + 1]] <- B[x[t], x[t + 1]] + 1
flock::unlock(locked)
}
对于这个特定示例,问题在于值的并行并发更新(参见 https://privefl.github.io/blog/a-guide-to-parallelism-in-r/#advanced-parallelism-synchronization)。
在这里,我根本不会使用并行。我宁愿选择顺序(但矢量化)访问器。
我会首先重新组合索引以递增:
library(dplyr)
ind <- data.frame(i = x[-length(x)], j = x[-1]) %>%
group_by(i, j) %>%
count()
然后,我将使用两列矩阵访问器来更新相应的值,而无需使用 R 循环。
B <- FBM(n, n, init = 0)
ind2 <- as.matrix(ind[1:2])
B[ind2] <- B[ind2] + ind[[3]]