RcppParallel 结果随多线程变化
RcppParallel result changes with multiple threads
我是 Rcpp 和 RcppParallel 的新手。我正在尝试使用 RcppParallel 来优化我的 R 代码,现在我正在制作一些玩具代码来研究它们。
现在我做了一个RcppParallel代码,结果和我想的不一样。每当我尝试该功能时,结果都会发生变化。
这是我的代码
library(Rcpp)
library(RcppParallel)
library(RcppArmadillo)
library(data.table)
library(pryr)
#key<-rep(rep(c("a","b"),each=12500000))
grp<-rep(rep(c("a","b","c","d","e"),each=2500000),2)
val<-rnorm(25000000,0,8)
dat<-setDT(data.frame(grp=grp,val=val))
#raw<-setDT(data.frame(key=key,grp=grp,val=val))
na.omit(dat)
#setkey(dat,"key","grp")
setkey(dat,"grp")
#key<-as.factor(key)
grp<-as.factor(grp)
gc()
sourceCpp("test.cpp")
set.seed(1)
dist<-do.call(rbind,tapply(1:NROW(dat),as.factor(dat$grp),function(x) stats::rmultinom(1,NROW(x),rep(1/NROW(x),NROW(x)))))
setThreadOptions(numThreads=4)
for(i in 1:10) print(test(dat,table(dat$grp),dist))
setThreadOptions(numThreads=1)
for(i in 1:10) print(test(dat,table(dat$grp),dist))
这是 Rcpp 代码
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace Rcpp;
using namespace RcppParallel;
// [[Rcpp::depends(RcppParallel)]]
struct INDSUM : public Worker
{
const RVector<double> val;
const RVector<int> dist;
RVector<double> output;
INDSUM(const NumericVector &_val, const IntegerVector &_dist, NumericVector &_output)
: val(_val),dist(_dist),output(_output) {}
void operator()(size_t begin, size_t end)
{
for(size_t i=begin; i< end; i++)
{
output[0] += val[i]*dist[i];
output[1] += val[i]*val[i]*dist[i];
}
}
};
// [[Rcpp::export]]
NumericMatrix test(DataFrame &df, NumericVector &grptable, IntegerVector &dist) {
IntegerVector idx = df[0];
NumericVector val = df[1];
size_t grpnum=grptable.length();
NumericMatrix output(grpnum,2);
NumericVector tmp(2);
NumericVector sum(grpnum);
NumericVector sumsq(grpnum);
INDSUM indexsum(val, dist, tmp);
for(size_t j=0, cnt=0 ; j<grpnum; j++)
{
parallelFor(cnt,cnt+grptable[j],indexsum);
sum[j] = tmp[0];
sumsq[j] = tmp[1];
tmp[0]=tmp[1]=0;
cnt+=grptable[j];
output(j,0)=sum[j];
output(j,1)=sumsq[j];
}
return output;
}
当线程数为4时,结果如下:
[1,] -1328.307 17484320
[2,] -8175.153 96214984
[3,] -17317.002 80623284
[4,] -7964.977 83306470
[5,] 16800.532 82033877
[,1] [,2]
[1,] -6886.349 79063639
[2,] -19529.382 80570964
[3,] -21653.201 80363894
[4,] -3256.842 81909243
[5,] 2266.153 79906235
[,1] [,2]
[1,] -12306.965 80778175
[2,] 2490.576 80411474
[3,] -6631.620 80495938
[4,] 1477.019 52269743
[5,] -1167.497 92402507
[,1] [,2]
[1,] -15329.571 81025309
[2,] -10860.718 76984730
[3,] 2430.499 96612706
[4,] 17321.521 97019020
[5,] 6702.637 81961180
[,1] [,2]
[1,] -20691.132 80094518
[2,] -7922.633 80567608
[3,] -6579.185 83056898
[4,] 24761.582 80163577
[5,] -8022.315 80314909
[,1] [,2]
[1,] -18693.011 80439056
[2,] -9705.923 80580004
[3,] -20444.317 89155340
[4,] 13285.827 80487654
[5,] 14155.066 84664113
[,1] [,2]
[1,] -17247.0503 80775485
[2,] -15141.5544 107365503
[3,] 315.3996 91628341
[4,] 4731.3399 81038215
[5,] 8374.9257 77583065
[,1] [,2]
[1,] -973.0294 81194199
[2,] -13473.3673 79293218
[3,] -11028.3987 80865948
[4,] 1126.7350 80539346
[5,] 12452.9127 79909947
[,1] [,2]
[1,] -18049.085 80850260
[2,] -11117.905 81232899
[3,] -12195.323 80512124
[4,] -9120.614 80557568
[5,] 6800.466 76608380
[,1] [,2]
[1,] -33611.785 76090786
[2,] -10439.372 85329662
[3,] -1847.212 92939924
[4,] -3906.611 86959278
[5,] -5839.027 85420969
而当我使用单线程时,我得到的结果是:
[,1] [,2]
[1,] -59155.958 319664972
[2,] -26054.697 320363256
[3,] -40476.176 320388784
[4,] 6573.581 320427866
[5,] 30657.222 319834396
谢谢你。
我们的代码中存在 race condition(在本例中也称为数据竞争),因为所有线程都写入相同的双元素向量 tmp
。但是,递增一个值需要三个步骤:
- 读取值
- 增加读取值
- 写回增加的值
如果所有线程在没有锁定的情况下并行执行此操作,一个线程将在另一个线程读取和写入之间回写,从而丢弃第一个线程完成的增量。
一种解决方案是使用 per-thread 输出变量,这些变量是在串行代码中的 wards 之后收集的。查看 RcppParallel::parallelReduce
也可能有意义,因为您要尝试做的是 reduce 类型的操作。
我是 Rcpp 和 RcppParallel 的新手。我正在尝试使用 RcppParallel 来优化我的 R 代码,现在我正在制作一些玩具代码来研究它们。 现在我做了一个RcppParallel代码,结果和我想的不一样。每当我尝试该功能时,结果都会发生变化。
这是我的代码
library(Rcpp)
library(RcppParallel)
library(RcppArmadillo)
library(data.table)
library(pryr)
#key<-rep(rep(c("a","b"),each=12500000))
grp<-rep(rep(c("a","b","c","d","e"),each=2500000),2)
val<-rnorm(25000000,0,8)
dat<-setDT(data.frame(grp=grp,val=val))
#raw<-setDT(data.frame(key=key,grp=grp,val=val))
na.omit(dat)
#setkey(dat,"key","grp")
setkey(dat,"grp")
#key<-as.factor(key)
grp<-as.factor(grp)
gc()
sourceCpp("test.cpp")
set.seed(1)
dist<-do.call(rbind,tapply(1:NROW(dat),as.factor(dat$grp),function(x) stats::rmultinom(1,NROW(x),rep(1/NROW(x),NROW(x)))))
setThreadOptions(numThreads=4)
for(i in 1:10) print(test(dat,table(dat$grp),dist))
setThreadOptions(numThreads=1)
for(i in 1:10) print(test(dat,table(dat$grp),dist))
这是 Rcpp 代码
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace Rcpp;
using namespace RcppParallel;
// [[Rcpp::depends(RcppParallel)]]
struct INDSUM : public Worker
{
const RVector<double> val;
const RVector<int> dist;
RVector<double> output;
INDSUM(const NumericVector &_val, const IntegerVector &_dist, NumericVector &_output)
: val(_val),dist(_dist),output(_output) {}
void operator()(size_t begin, size_t end)
{
for(size_t i=begin; i< end; i++)
{
output[0] += val[i]*dist[i];
output[1] += val[i]*val[i]*dist[i];
}
}
};
// [[Rcpp::export]]
NumericMatrix test(DataFrame &df, NumericVector &grptable, IntegerVector &dist) {
IntegerVector idx = df[0];
NumericVector val = df[1];
size_t grpnum=grptable.length();
NumericMatrix output(grpnum,2);
NumericVector tmp(2);
NumericVector sum(grpnum);
NumericVector sumsq(grpnum);
INDSUM indexsum(val, dist, tmp);
for(size_t j=0, cnt=0 ; j<grpnum; j++)
{
parallelFor(cnt,cnt+grptable[j],indexsum);
sum[j] = tmp[0];
sumsq[j] = tmp[1];
tmp[0]=tmp[1]=0;
cnt+=grptable[j];
output(j,0)=sum[j];
output(j,1)=sumsq[j];
}
return output;
}
当线程数为4时,结果如下:
[1,] -1328.307 17484320
[2,] -8175.153 96214984
[3,] -17317.002 80623284
[4,] -7964.977 83306470
[5,] 16800.532 82033877
[,1] [,2]
[1,] -6886.349 79063639
[2,] -19529.382 80570964
[3,] -21653.201 80363894
[4,] -3256.842 81909243
[5,] 2266.153 79906235
[,1] [,2]
[1,] -12306.965 80778175
[2,] 2490.576 80411474
[3,] -6631.620 80495938
[4,] 1477.019 52269743
[5,] -1167.497 92402507
[,1] [,2]
[1,] -15329.571 81025309
[2,] -10860.718 76984730
[3,] 2430.499 96612706
[4,] 17321.521 97019020
[5,] 6702.637 81961180
[,1] [,2]
[1,] -20691.132 80094518
[2,] -7922.633 80567608
[3,] -6579.185 83056898
[4,] 24761.582 80163577
[5,] -8022.315 80314909
[,1] [,2]
[1,] -18693.011 80439056
[2,] -9705.923 80580004
[3,] -20444.317 89155340
[4,] 13285.827 80487654
[5,] 14155.066 84664113
[,1] [,2]
[1,] -17247.0503 80775485
[2,] -15141.5544 107365503
[3,] 315.3996 91628341
[4,] 4731.3399 81038215
[5,] 8374.9257 77583065
[,1] [,2]
[1,] -973.0294 81194199
[2,] -13473.3673 79293218
[3,] -11028.3987 80865948
[4,] 1126.7350 80539346
[5,] 12452.9127 79909947
[,1] [,2]
[1,] -18049.085 80850260
[2,] -11117.905 81232899
[3,] -12195.323 80512124
[4,] -9120.614 80557568
[5,] 6800.466 76608380
[,1] [,2]
[1,] -33611.785 76090786
[2,] -10439.372 85329662
[3,] -1847.212 92939924
[4,] -3906.611 86959278
[5,] -5839.027 85420969
而当我使用单线程时,我得到的结果是:
[,1] [,2]
[1,] -59155.958 319664972
[2,] -26054.697 320363256
[3,] -40476.176 320388784
[4,] 6573.581 320427866
[5,] 30657.222 319834396
谢谢你。
我们的代码中存在 race condition(在本例中也称为数据竞争),因为所有线程都写入相同的双元素向量 tmp
。但是,递增一个值需要三个步骤:
- 读取值
- 增加读取值
- 写回增加的值
如果所有线程在没有锁定的情况下并行执行此操作,一个线程将在另一个线程读取和写入之间回写,从而丢弃第一个线程完成的增量。
一种解决方案是使用 per-thread 输出变量,这些变量是在串行代码中的 wards 之后收集的。查看 RcppParallel::parallelReduce
也可能有意义,因为您要尝试做的是 reduce 类型的操作。