Parallel 和 Rcpp Armadillo 的问题:集群 worker 之间可能存在变量损坏
Problems with Parallel and Rcpp Armadillo: Possible variable corruption between cluster workers
我在将 Rcpp(和 Rcpp Armadillo)与并行包一起使用时遇到了一些问题,并且根据我用于计算的内核数量,我得到了不正确的结果。
我有一个函数 compute_indices
,它为我的数据中的每个观察结果计算 3 组索引。它首先根据我指定的内核数量使用 parallel::makeCluster
创建一个 (FORK) 集群。然后它将我的数据分成相等的部分,并使用我之前创建的集群对象在每个部分上应用(使用 parallel::parLapply
)函数 meancorlsm
。现在 meancorlsm
基本上是我在 Rcpp 和 Rcpp Armadillo 中编写的函数(称为 covArmadilloMclsmNormal
)的包装器,因为我试图加快计算速度。但是,我有另一个完全用 R 编写的函数版本(称为 meancorlsR
),我用它来测试 RccpArmadillo 版本的正确性。
现在,如果我 运行 compute_indices
与 meancorlsm
(我首先使用 sourceCpp()
使 covArmadilloMclsmNormal
在全局环境中可用) ,根据我告诉 compute_indices
使用的内核数量,我得到部分正确的答案。具体来说,如果我使用 4 个内核,则计算出的索引的前 1/4 是正确的,如果我使用 2 个内核,则结果的前 1/2 是正确的,如果我使用单个内核,则所有结果都是正确的.我使用 meancorlsm
(meancorlsR
如前所述)的 R 版本生成的答案来检查结果的正确性。因为我在使用单核时得到了正确的结果,所以我觉得 RcppArmadillo 函数是正确的,并且可能是集群的不同 threads/workers 在计算过程中相互干扰,因此我变得很奇怪行为。
下面是compute_indices
:
compute_indices <- function(dt, nr = ncol(dt), n_core = 4){
par_cluster <- makeCluster(n_core, type = "FORK")
# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:nr, n_core)
# compute auxiliar support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN = 2, var)
data_sds <- apply(dt, 2, sd)
# compute Outliers using parapply
vectors <- do.call(rbind, parLapply(par_cluster, splits,
meancorlsm, dt, data_means,
data_vars, data_sds))
stopCluster(par_cluster)
vectors
}
和meancorlsm
meancorlsm<- function(i, mtx, means, vars, sds){
pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i], dt = mtx,
dtmeans = means, dtvars = vars,
dtsds = sds)
colnames(pre_outl) <- c("sh", "mg", "ap")
pre_outl
}
使用 covArmadilloMclsmNormal
Rcpp 函数:
#include <RcppArmadillo.h>
using namespace Rcpp;
//[[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::export]]
arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt,
arma::vec dtmeans, arma::vec dtvars,
arma::vec dtsds){
arma::mat out(dt.n_cols, dti.n_cols);
out = arma::cov(dt, dti);
int n = out.n_rows;
int p = out.n_cols;
//declare matrices to hold result
arma::vec temp(n);
arma::mat preout(3,p);
for(int i = 0; i<p ; ++i){
temp = out.col(i)/dtvars;
preout(0,i) = arma::mean((out.col(i)/dtsds))/dtsds(i);
preout(1, i) = dtmeans(i) - arma::mean(temp % dtmeans);
preout(2, i) = arma::mean(temp);
}
return preout.t();
}
现在这是我用于测试的 meancorlsm
的 R 版本:
meancorlsR <- function(i, mtx, means, vars, sds){
pre_outl <- apply(cov(mtx, mtx[,i], use = "pairwise.complete.obs"), 2,
function(col){
tmp <- col/vars
c("sh" = mean(col/sds, na.rm = T),
"mg" = mean(tmp * means, na.rm = T),
"ap" = mean(tmp, na.rm = T))
})
pre_outl[1,] <- pre_outl[1,]/sds[i]
pre_outl[2,] <- means[i] - pre_outl[2,]
t(pre_outl)
}
您可以在 compute_indices
函数中用 meancorlsR
替换 meancorlsm
函数,它会起作用(用于测试)。但是,为了立即再现,我在此处将其提供为 compute_indicesR
.
compute_indicesR <- function(dt, nr = ncol(dt), n_core = 4){
par_cluster <- makeCluster(n_core, type = "FORK")
# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:nr, n_core)
# compute auxiliar support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN = 2, var)
data_sds <- apply(dt, 2, sd)
# compute using parapply
vectors <- do.call(rbind, parLapply(par_cluster, splits,
meancorlsR, dt, data_means,
data_vars, data_sds))
stopCluster(par_cluster)
vectors
}
最后这里是 运行 的最小示例:
library(Rcpp)
library(parallel)
# use this to source the Rcpp function from a file
# to make the covArmadilloMclsmNormal function available
sourceCpp("covArmadilloMclsmNormal.cpp")
data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row
indices_rcpp4 <- compute_indices(dt) # using 4 cores
indices_rcpp2 <- compute_indices(dt, n_core = 2) # using 2 cores
indices_rcpp1 <- compute_indices(dt, n_core = 1) # 1 core
# using the R version
# already replaced the meancorlsm function to meancorlsR here
indices_R <- compute_indicesR(dt) # R version
我希望所有输出都与 R 版本生成的输出相匹配,而不管我指定的内核数量如何。然而,它们是不同的。这是我使用 R 版本得到的结果:
" sh mg ap
1 0.634272567307155 -7.09315427645087 0.992492531586726
2 0.868144125333511 22.3206363514708 0.622504642756242
3 0.819231480417289 24.8027625928423 0.756088388472384
4 0.830462006956641 -6.26663378557611 1.03847748215856
5 0.836182582923674 15.0558414918816 0.901413435882058
6 0.648813304451793 23.4689784056255 0.40175151333289
7 0.839409670144446 3.73900558549848 0.883655665107456
8 0.781070895796188 13.1775081516076 0.810306856575333
9 0.772967959938828 2.24023877077873 1.1146249477264
10 0.826849986442202 3.31330282673472 0.910527502047015"
我使用 2 个内核的 Rcpp 版本得到的结果是
" sh mg ap
1 0.634272567307155 -7.09315427645086 0.992492531586726
2 0.868144125333511 22.3206363514708 0.622504642756242
3 0.819231480417289 24.8027625928424 0.756088388472384
4 0.830462006956641 -6.26663378557612 1.03847748215856
5 0.836182582923674 15.0558414918816 0.901413435882058
6 0.231943043232274 28.1832641199112 0.40175151333289
7 1.20839881621289 7.02471987121276 0.883655665107456
8 0.865212462148289 21.7489367230362 0.810306856575333
9 0.853048693647409 -10.474046943507 1.1146249477264
10 0.857055188335614 14.599017112449 0.910527502047015"
而对于使用 4 核的 Rccp 版本:
" sh mg ap
1 0.634272567307155 -7.09315427645086 0.992492531586726
2 0.868144125333511 22.3206363514708 0.622504642756242
3 0.819231480417289 24.8027625928424 0.756088388472384
4 0.648794650804865 -10.2666337855761 1.03847748215856
5 1.25119408317776 5.48441292045304 0.901413435882058
6 0.231943043232274 28.1832641199112 0.40175151333289
7 1.20839881621289 7.02471987121276 0.883655665107456
8 0.487272877566209 3.60607958017906 0.810306856575333
9 1.50139103326263 -6.75976122922128 1.1146249477264
10 1.01076542369015 15.4561599695919 0.910527502047015"
使用单核的 Rccp 版本产生了与 R 版本相同的答案,这是正确的结果。同样有趣的是,无论我使用多少内核,答案的 ap
列都保持不变,而 sh
和 mg
列发生变化。
最后,我的平台是Ubuntu16.04。似乎 FORK
集群在 Windows 上不起作用,因此您可能无法重现此结果。然而,即使使用 PSOCK
集群,我也得到了相同的行为(在这种情况下,我使用 clusterEvalQ()
来获取必要的 Cpp 函数,使它们可供工作人员使用)。非常感谢任何关于我做错了什么的帮助或见解。
别管我的评论,我误解了 Armadillo 的文档。
您的 C++ 代码正在使用 i
索引助手 dtmeans
和 dtsds
向量,
但是 i
对于每个并行实例总是从零开始,
所以你需要传递一个偏移量来指示跳过了多少列:
//[[Rcpp::depends(RcppArmadillo)]]
#include <RcppArmadillo.h>
//[[Rcpp::export]]
arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt, int offset,
arma::vec dtmeans, arma::vec dtvars, arma::vec dtsds)
{
arma::mat out = arma::cov(dt, dti);
size_t p = out.n_cols;
arma::mat preout(p,3);
for (int i = 0; i < p; ++i) {
arma::vec temp = out.col(i) / dtvars;
preout(i,0) = arma::mean((out.col(i) / dtsds)) / dtsds(i + offset);
preout(i,1) = dtmeans(i + offset) - arma::mean(temp % dtmeans);
preout(i,2) = arma::mean(temp);
}
return preout;
}
因此相应地:
meancorlsm <- function(i, mtx, means, vars, sds){
pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i, drop = FALSE], dt = mtx, offset = min(i) - 1L,
dtmeans = means, dtvars = vars,
dtsds = sds)
colnames(pre_outl) <- c("sh", "mg", "ap")
pre_outl
}
你甚至可以依次证实:
data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row
# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:ncol(dt), 2)
# compute auxiliary support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN = 2, var)
data_sds <- apply(dt, 2, sd)
do.call(rbind, lapply(splits,
meancorlsm, dt, data_means,
data_vars, data_sds))
sh mg ap
[1,] 0.6342726 -7.093154 0.9924925
[2,] 0.8681441 22.320636 0.6225046
[3,] 0.8192315 24.802763 0.7560884
[4,] 0.8304620 -6.266634 1.0384775
[5,] 0.8361826 15.055841 0.9014134
[6,] 0.6488133 23.468978 0.4017515
[7,] 0.8394097 3.739006 0.8836557
[8,] 0.7810709 13.177508 0.8103069
[9,] 0.7729680 2.240239 1.1146249
[10,] 0.8268500 3.313303 0.9105275
顺便说一句,我认为 C++ 代码中的 pre-allocating 矩阵没有用,如果你只是用 =
.
覆盖它们的话
我在将 Rcpp(和 Rcpp Armadillo)与并行包一起使用时遇到了一些问题,并且根据我用于计算的内核数量,我得到了不正确的结果。
我有一个函数 compute_indices
,它为我的数据中的每个观察结果计算 3 组索引。它首先根据我指定的内核数量使用 parallel::makeCluster
创建一个 (FORK) 集群。然后它将我的数据分成相等的部分,并使用我之前创建的集群对象在每个部分上应用(使用 parallel::parLapply
)函数 meancorlsm
。现在 meancorlsm
基本上是我在 Rcpp 和 Rcpp Armadillo 中编写的函数(称为 covArmadilloMclsmNormal
)的包装器,因为我试图加快计算速度。但是,我有另一个完全用 R 编写的函数版本(称为 meancorlsR
),我用它来测试 RccpArmadillo 版本的正确性。
现在,如果我 运行 compute_indices
与 meancorlsm
(我首先使用 sourceCpp()
使 covArmadilloMclsmNormal
在全局环境中可用) ,根据我告诉 compute_indices
使用的内核数量,我得到部分正确的答案。具体来说,如果我使用 4 个内核,则计算出的索引的前 1/4 是正确的,如果我使用 2 个内核,则结果的前 1/2 是正确的,如果我使用单个内核,则所有结果都是正确的.我使用 meancorlsm
(meancorlsR
如前所述)的 R 版本生成的答案来检查结果的正确性。因为我在使用单核时得到了正确的结果,所以我觉得 RcppArmadillo 函数是正确的,并且可能是集群的不同 threads/workers 在计算过程中相互干扰,因此我变得很奇怪行为。
下面是compute_indices
:
compute_indices <- function(dt, nr = ncol(dt), n_core = 4){
par_cluster <- makeCluster(n_core, type = "FORK")
# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:nr, n_core)
# compute auxiliar support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN = 2, var)
data_sds <- apply(dt, 2, sd)
# compute Outliers using parapply
vectors <- do.call(rbind, parLapply(par_cluster, splits,
meancorlsm, dt, data_means,
data_vars, data_sds))
stopCluster(par_cluster)
vectors
}
和meancorlsm
meancorlsm<- function(i, mtx, means, vars, sds){
pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i], dt = mtx,
dtmeans = means, dtvars = vars,
dtsds = sds)
colnames(pre_outl) <- c("sh", "mg", "ap")
pre_outl
}
使用 covArmadilloMclsmNormal
Rcpp 函数:
#include <RcppArmadillo.h>
using namespace Rcpp;
//[[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::export]]
arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt,
arma::vec dtmeans, arma::vec dtvars,
arma::vec dtsds){
arma::mat out(dt.n_cols, dti.n_cols);
out = arma::cov(dt, dti);
int n = out.n_rows;
int p = out.n_cols;
//declare matrices to hold result
arma::vec temp(n);
arma::mat preout(3,p);
for(int i = 0; i<p ; ++i){
temp = out.col(i)/dtvars;
preout(0,i) = arma::mean((out.col(i)/dtsds))/dtsds(i);
preout(1, i) = dtmeans(i) - arma::mean(temp % dtmeans);
preout(2, i) = arma::mean(temp);
}
return preout.t();
}
现在这是我用于测试的 meancorlsm
的 R 版本:
meancorlsR <- function(i, mtx, means, vars, sds){
pre_outl <- apply(cov(mtx, mtx[,i], use = "pairwise.complete.obs"), 2,
function(col){
tmp <- col/vars
c("sh" = mean(col/sds, na.rm = T),
"mg" = mean(tmp * means, na.rm = T),
"ap" = mean(tmp, na.rm = T))
})
pre_outl[1,] <- pre_outl[1,]/sds[i]
pre_outl[2,] <- means[i] - pre_outl[2,]
t(pre_outl)
}
您可以在 compute_indices
函数中用 meancorlsR
替换 meancorlsm
函数,它会起作用(用于测试)。但是,为了立即再现,我在此处将其提供为 compute_indicesR
.
compute_indicesR <- function(dt, nr = ncol(dt), n_core = 4){
par_cluster <- makeCluster(n_core, type = "FORK")
# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:nr, n_core)
# compute auxiliar support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN = 2, var)
data_sds <- apply(dt, 2, sd)
# compute using parapply
vectors <- do.call(rbind, parLapply(par_cluster, splits,
meancorlsR, dt, data_means,
data_vars, data_sds))
stopCluster(par_cluster)
vectors
}
最后这里是 运行 的最小示例:
library(Rcpp)
library(parallel)
# use this to source the Rcpp function from a file
# to make the covArmadilloMclsmNormal function available
sourceCpp("covArmadilloMclsmNormal.cpp")
data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row
indices_rcpp4 <- compute_indices(dt) # using 4 cores
indices_rcpp2 <- compute_indices(dt, n_core = 2) # using 2 cores
indices_rcpp1 <- compute_indices(dt, n_core = 1) # 1 core
# using the R version
# already replaced the meancorlsm function to meancorlsR here
indices_R <- compute_indicesR(dt) # R version
我希望所有输出都与 R 版本生成的输出相匹配,而不管我指定的内核数量如何。然而,它们是不同的。这是我使用 R 版本得到的结果:
" sh mg ap
1 0.634272567307155 -7.09315427645087 0.992492531586726
2 0.868144125333511 22.3206363514708 0.622504642756242
3 0.819231480417289 24.8027625928423 0.756088388472384
4 0.830462006956641 -6.26663378557611 1.03847748215856
5 0.836182582923674 15.0558414918816 0.901413435882058
6 0.648813304451793 23.4689784056255 0.40175151333289
7 0.839409670144446 3.73900558549848 0.883655665107456
8 0.781070895796188 13.1775081516076 0.810306856575333
9 0.772967959938828 2.24023877077873 1.1146249477264
10 0.826849986442202 3.31330282673472 0.910527502047015"
我使用 2 个内核的 Rcpp 版本得到的结果是
" sh mg ap
1 0.634272567307155 -7.09315427645086 0.992492531586726
2 0.868144125333511 22.3206363514708 0.622504642756242
3 0.819231480417289 24.8027625928424 0.756088388472384
4 0.830462006956641 -6.26663378557612 1.03847748215856
5 0.836182582923674 15.0558414918816 0.901413435882058
6 0.231943043232274 28.1832641199112 0.40175151333289
7 1.20839881621289 7.02471987121276 0.883655665107456
8 0.865212462148289 21.7489367230362 0.810306856575333
9 0.853048693647409 -10.474046943507 1.1146249477264
10 0.857055188335614 14.599017112449 0.910527502047015"
而对于使用 4 核的 Rccp 版本:
" sh mg ap
1 0.634272567307155 -7.09315427645086 0.992492531586726
2 0.868144125333511 22.3206363514708 0.622504642756242
3 0.819231480417289 24.8027625928424 0.756088388472384
4 0.648794650804865 -10.2666337855761 1.03847748215856
5 1.25119408317776 5.48441292045304 0.901413435882058
6 0.231943043232274 28.1832641199112 0.40175151333289
7 1.20839881621289 7.02471987121276 0.883655665107456
8 0.487272877566209 3.60607958017906 0.810306856575333
9 1.50139103326263 -6.75976122922128 1.1146249477264
10 1.01076542369015 15.4561599695919 0.910527502047015"
使用单核的 Rccp 版本产生了与 R 版本相同的答案,这是正确的结果。同样有趣的是,无论我使用多少内核,答案的 ap
列都保持不变,而 sh
和 mg
列发生变化。
最后,我的平台是Ubuntu16.04。似乎 FORK
集群在 Windows 上不起作用,因此您可能无法重现此结果。然而,即使使用 PSOCK
集群,我也得到了相同的行为(在这种情况下,我使用 clusterEvalQ()
来获取必要的 Cpp 函数,使它们可供工作人员使用)。非常感谢任何关于我做错了什么的帮助或见解。
别管我的评论,我误解了 Armadillo 的文档。
您的 C++ 代码正在使用 i
索引助手 dtmeans
和 dtsds
向量,
但是 i
对于每个并行实例总是从零开始,
所以你需要传递一个偏移量来指示跳过了多少列:
//[[Rcpp::depends(RcppArmadillo)]]
#include <RcppArmadillo.h>
//[[Rcpp::export]]
arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt, int offset,
arma::vec dtmeans, arma::vec dtvars, arma::vec dtsds)
{
arma::mat out = arma::cov(dt, dti);
size_t p = out.n_cols;
arma::mat preout(p,3);
for (int i = 0; i < p; ++i) {
arma::vec temp = out.col(i) / dtvars;
preout(i,0) = arma::mean((out.col(i) / dtsds)) / dtsds(i + offset);
preout(i,1) = dtmeans(i + offset) - arma::mean(temp % dtmeans);
preout(i,2) = arma::mean(temp);
}
return preout;
}
因此相应地:
meancorlsm <- function(i, mtx, means, vars, sds){
pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i, drop = FALSE], dt = mtx, offset = min(i) - 1L,
dtmeans = means, dtvars = vars,
dtsds = sds)
colnames(pre_outl) <- c("sh", "mg", "ap")
pre_outl
}
你甚至可以依次证实:
data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row
# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:ncol(dt), 2)
# compute auxiliary support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN = 2, var)
data_sds <- apply(dt, 2, sd)
do.call(rbind, lapply(splits,
meancorlsm, dt, data_means,
data_vars, data_sds))
sh mg ap
[1,] 0.6342726 -7.093154 0.9924925
[2,] 0.8681441 22.320636 0.6225046
[3,] 0.8192315 24.802763 0.7560884
[4,] 0.8304620 -6.266634 1.0384775
[5,] 0.8361826 15.055841 0.9014134
[6,] 0.6488133 23.468978 0.4017515
[7,] 0.8394097 3.739006 0.8836557
[8,] 0.7810709 13.177508 0.8103069
[9,] 0.7729680 2.240239 1.1146249
[10,] 0.8268500 3.313303 0.9105275
顺便说一句,我认为 C++ 代码中的 pre-allocating 矩阵没有用,如果你只是用 =
.