如何正确使用 RcppThread 并行化 for 循环
How to properly use RcppThread to parallelize a for-loop
我正在尝试将 for 循环与 RcppThread 并行化。未并行版本如下所示:
IntegerVector simulate_pos(NumericVector x_pop,
NumericVector y_pop,
int n_studies,
int sample_size_min,
int sample_size_max,
bool replace,
float lower_limit,
float upper_limit){
IntegerVector pos(n_studies);
int npop = x_pop.size();
NumericVector index_pop(npop);
for (int i = 0; i < npop; i++){
index_pop[i] = i;
}
// HERE IS THE LOOP-------------------------------------------------
for (int k = 0; k < n_studies; k++){
pos[k] = simulate_one_pos(x_pop, y_pop, index_pop, sample_size_min,
sample_size_max, replace, lower_limit,
upper_limit);
}
// ------------------------------------------------------------------
return(pos);
}
现在觉得用parallelFor应该没问题了:
std::vector<int> simulate_pos(NumericVector x_pop,
NumericVector y_pop,
int n_studies,
int sample_size_min,
int sample_size_max,
bool replace,
float lower_limit,
float upper_limit,
int n_threads){
std::vector<int> pos(n_studies);
int npop = x_pop.size();
NumericVector index_pop(npop);
for (int i = 0; i < npop; i++){
index_pop[i] = i;
}
// HERE IS THE LOOP-------------------------------------------------
RcppThread::parallelFor(0, pos.size(), [&] (int i){
pos[i] = simulate_one_pos(x_pop, y_pop, index_pop, sample_size_min,
sample_size_max, replace, lower_limit,
upper_limit);
});
// -----------------------------------------------------------------
return(pos);
}
为了坚持 RcppThread 论文 (https://arxiv.org/pdf/1811.00450.pdf),我使用 std::vector
作为 return 值而不是 Rcpp 等价物 IntegerVector
.
有时代码可以运行,有时会出现堆栈不平衡错误,有时只是挂起。我假设我在概念上犯了一个很大的错误,并且必须指出我在 C++ 方面几乎是个菜鸟。
是不是几个线程同时读取同一个内存地址的问题?还是 Rcpp 数据结构(例如 NumericVector)导致问题?
完整的代码可以在github上找到:https://github.com/johannes-titz/fastpos/tree/rcppthread
为了 运行 你自己:
devtools::install_github("johannes-titz/fastpos@rcppthread")
pop <- fastpos::create_pop(0.5, 1e5)
x <- pop[,1]
y <- pop[,2]
lower_limit <- 0.4
upper_limit <- 0.6
n_studies <- 50
sample_size_min <- 20
sample_size_max <- 1000
res <- fastpos::simulate_pos(x, y, n_studies, sample_size_min, sample_size_max, TRUE, lower_limit,
upper_limit, 4)
PS:也尝试使用 pool.pushReturn
,但结果相同。
编辑:问题确实是使用 Rcpp 数据结构 (NumericVector
)。将它们全部替换为 std::vector
时,运行 没问题。现在,没有 Rcpp 糖,我必须找到一种方法如何从 std::vector
(在我在循环中调用的函数内)中采样,但这显然是值得的。
你写
Is the problem that several threads are reading the same memory address at the same time? Or are the Rcpp data structures (e.g. NumericVector) causing issues?
我倾向于说“可能”。请参阅包 RcppParallel 中的优秀文档,以及它如何从小示例构建。
我知道这听起来没有吸引力,但我真的建议从一个 非常小的函数 构建,可能有零个或一个参数慢慢添加并确保当你跨越添加 any R-accessible 或创建的内存仍然按预期工作。
可悲的是,我们不能在内部 R 代码周围打一个并行外循环并“希望最好”。 OpenMP 和朋友要求更高,R 的 single-threaded 性质施加了更多限制。
(您可能当然知道 运行 higher-level 从 R 到 R 函数的并行性,但这是一个不同的主题和方法。)
我正在尝试将 for 循环与 RcppThread 并行化。未并行版本如下所示:
IntegerVector simulate_pos(NumericVector x_pop,
NumericVector y_pop,
int n_studies,
int sample_size_min,
int sample_size_max,
bool replace,
float lower_limit,
float upper_limit){
IntegerVector pos(n_studies);
int npop = x_pop.size();
NumericVector index_pop(npop);
for (int i = 0; i < npop; i++){
index_pop[i] = i;
}
// HERE IS THE LOOP-------------------------------------------------
for (int k = 0; k < n_studies; k++){
pos[k] = simulate_one_pos(x_pop, y_pop, index_pop, sample_size_min,
sample_size_max, replace, lower_limit,
upper_limit);
}
// ------------------------------------------------------------------
return(pos);
}
现在觉得用parallelFor应该没问题了:
std::vector<int> simulate_pos(NumericVector x_pop,
NumericVector y_pop,
int n_studies,
int sample_size_min,
int sample_size_max,
bool replace,
float lower_limit,
float upper_limit,
int n_threads){
std::vector<int> pos(n_studies);
int npop = x_pop.size();
NumericVector index_pop(npop);
for (int i = 0; i < npop; i++){
index_pop[i] = i;
}
// HERE IS THE LOOP-------------------------------------------------
RcppThread::parallelFor(0, pos.size(), [&] (int i){
pos[i] = simulate_one_pos(x_pop, y_pop, index_pop, sample_size_min,
sample_size_max, replace, lower_limit,
upper_limit);
});
// -----------------------------------------------------------------
return(pos);
}
为了坚持 RcppThread 论文 (https://arxiv.org/pdf/1811.00450.pdf),我使用 std::vector
作为 return 值而不是 Rcpp 等价物 IntegerVector
.
有时代码可以运行,有时会出现堆栈不平衡错误,有时只是挂起。我假设我在概念上犯了一个很大的错误,并且必须指出我在 C++ 方面几乎是个菜鸟。
是不是几个线程同时读取同一个内存地址的问题?还是 Rcpp 数据结构(例如 NumericVector)导致问题?
完整的代码可以在github上找到:https://github.com/johannes-titz/fastpos/tree/rcppthread
为了 运行 你自己:
devtools::install_github("johannes-titz/fastpos@rcppthread")
pop <- fastpos::create_pop(0.5, 1e5)
x <- pop[,1]
y <- pop[,2]
lower_limit <- 0.4
upper_limit <- 0.6
n_studies <- 50
sample_size_min <- 20
sample_size_max <- 1000
res <- fastpos::simulate_pos(x, y, n_studies, sample_size_min, sample_size_max, TRUE, lower_limit,
upper_limit, 4)
PS:也尝试使用 pool.pushReturn
,但结果相同。
编辑:问题确实是使用 Rcpp 数据结构 (NumericVector
)。将它们全部替换为 std::vector
时,运行 没问题。现在,没有 Rcpp 糖,我必须找到一种方法如何从 std::vector
(在我在循环中调用的函数内)中采样,但这显然是值得的。
你写
Is the problem that several threads are reading the same memory address at the same time? Or are the Rcpp data structures (e.g. NumericVector) causing issues?
我倾向于说“可能”。请参阅包 RcppParallel 中的优秀文档,以及它如何从小示例构建。
我知道这听起来没有吸引力,但我真的建议从一个 非常小的函数 构建,可能有零个或一个参数慢慢添加并确保当你跨越添加 any R-accessible 或创建的内存仍然按预期工作。
可悲的是,我们不能在内部 R 代码周围打一个并行外循环并“希望最好”。 OpenMP 和朋友要求更高,R 的 single-threaded 性质施加了更多限制。
(您可能当然知道 运行 higher-level 从 R 到 R 函数的并行性,但这是一个不同的主题和方法。)