这个自定义过程如何并行完成?还是多核?
How could this custom process be done in parallel? or multicores?
我想弄清楚如何在我创建的这个随机森林循环中使用任何并行处理包,如 foreach 或 doParallel:
ModelInfo <- data.frame ( model=as.numeric()
,Nodesize=as.numeric()
,Mrty=as.numeric()
,Maxdepth=as.numeric()
,Cp=as.numeric()
,Accuracy_Training=as.numeric()
,AUC_Training=as.numeric())
w=1
set.seed(1809)
NumberOfSamples=1
# Number of iterations
rfPred=list()
pred=list()
roundpred=list()
cTab=list()
Acc=list()
pred.to.roc=list()
pred.rocr=list()
perf.rocr=list()
AUC=list()
Var_imp=list()
rf_model_tr = list()
length(rf_model_tr) <- NumberOfSamples
for (i in 1:NumberOfSamples)
{
rf_model_tr[[i]] = list()
rfPred[[i]]=list()
pred[[i]]=list()
roundpred[[i]]=list()
cTab[[i]]=list()
Acc[[i]]=list()
pred.to.roc[[i]]=list()
pred.rocr[[i]]=list()
perf.rocr[[i]]=list()
AUC[[i]]=list()
Var_imp[[i]]=list()
## Tune nodesize
nodesize =c(10,20,50,80,100,200)
n=length(nodesize)
length(rf_model_tr[[i]]) <- n
for ( j in 1: length (nodesize))
{
rf_model_tr[[i]][[j]] = list()
rfPred[[i]][[j]]=list()
pred[[i]][[j]]=list()
roundpred[[i]][[j]]=list()
cTab[[i]][[j]]=list()
Acc[[i]][[j]]=list()
pred.to.roc[[i]][[j]]=list()
pred.rocr[[i]][[j]]=list()
perf.rocr[[i]][[j]]=list()
AUC[[i]][[j]]=list()
Var_imp[[i]][[j]]=list()
## Tune mrty
mrtysize =c(2,3,4)
m=length(mrtysize)
length(rf_model_tr[[i]][[j]]) <- m
for ( k in 1: length (mrtysize))
{
rf_model_tr[[i]][[j]][[k]] = list()
rfPred[[i]][[j]][[k]]=list()
pred[[i]][[j]][[k]]=list()
roundpred[[i]][[j]][[k]]=list()
cTab[[i]][[j]][[k]]=list()
Acc[[i]][[j]][[k]]=list()
pred.to.roc[[i]][[j]][[k]]=list()
pred.rocr[[i]][[j]][[k]]=list()
perf.rocr[[i]][[j]][[k]]=list()
AUC[[i]][[j]][[k]]=list()
Var_imp[[i]][[j]][[k]]=list()
## Tune maxdepth
maxdep =c(10,20,30)
z=length(maxdep)
length(rf_model_tr[[i]][[j]][[k]]) <- z
for (l in 1:length (maxdep))
{
rf_model_tr[[i]][[j]][[k]][[l]] = list()
rfPred[[i]][[j]][[k]][[l]]=list()
pred[[i]][[j]][[k]][[l]]=list()
roundpred[[i]][[j]][[k]][[l]]=list()
cTab[[i]][[j]][[k]][[l]]=list()
Acc[[i]][[j]][[k]][[l]]=list()
pred.to.roc[[i]][[j]][[k]][[l]]=list()
pred.rocr[[i]][[j]][[k]][[l]]=list()
perf.rocr[[i]][[j]][[k]][[l]]=list()
AUC[[i]][[j]][[k]][[l]]=list()
Var_imp[[i]][[j]][[k]][[l]]=list()
## Tune cp
cp =c(0,0.01,0.001)
p=length(cp)
length(rf_model_tr[[i]][[j]][[k]][[l]]) <- p
for (m in 1:length (cp))
{
rf_model_tr[[i]][[j]][[k]][[l]][[m]]= randomForest (as.factor(class) ~.
, data=train,mtry=mrtysize[[k]],maxDepth = maxdep[[l]], replace=F, importance=T, do.trace=10, ntree=200,nodesize=nodesize[j],cp=cp[[m]])
#Accuracy
rfPred[[i]][[j]][[k]][[l]][[m]] <- predict(rf_model_tr[[i]][[j]][[k]][[l]][[m]], train, type = "prob")
pred[[i]][[j]][[k]][[l]][[m]] <- colnames(rfPred[[i]][[j]][[k]][[l]][[m]] )[apply(rfPred[[i]][[j]][[k]][[l]][[m]] ,1,which.max)]
cTab[[i]][[j]][[k]][[l]][[m]] = table(pred[[i]][[j]][[k]][[l]][[m]],train$class)
Acc[[i]][[j]][[k]][[l]][[m]]<- sum(diag(cTab[[i]][[j]][[k]][[l]][[m]])) / sum(cTab[[i]][[j]][[k]][[l]][[m]])
#AUC
pred.to.roc[[i]][[j]][[k]][[l]][[m]]<-rfPred[[i]][[j]][[k]][[l]][[m]][,2]
pred.rocr[[i]][[j]][[k]][[l]][[m]]<-prediction(pred.to.roc[[i]][[j]][[k]][[l]][[m]],as.factor(train$class))
perf.rocr[[i]][[j]][[k]][[l]][[m]]<-performance(pred.rocr[[i]][[j]][[k]][[l]][[m]],measure="auc",x.measure="cutoff")
AUC[[i]][[j]][[k]][[l]][[m]]<-as.numeric(perf.rocr[[i]][[j]][[k]][[l]][[m]]@y.values)
#Variable Importance
Var_imp[[i]][[j]][[k]][[l]][[m]]<-(importance(rf_model_tr[[i]][[j]][[k]][[l]][[m]],type=2))
ModelInfo[w,1]<-w
ModelInfo[w,2]<-nodesize[[j]]
ModelInfo[w,3]<-mrtysize[[k]]
ModelInfo[w,4]<-maxdep[[l]]
ModelInfo[w,5]<-cp[[m]]
ModelInfo[w,6]<-Acc[[i]][[j]][[k]][[l]][[m]]
ModelInfo[w,7]<-AUC[[i]][[j]][[k]][[l]][[m]]
w=w+1
}
}
}
}
}
基本上,我正在做的是根据随机森林的可用调整参数(节点大小、cp 等)使用一个数据集创建所有可能的模型变体,并将该信息存储到 table每次迭代时的模型信息。另外我添加了准确率和AUC等指标,以便比较最终创建的不同模型并做出选择。
我正在寻找替代方案的原因是,尽管我确实有机会 运行 parRF 可以解决我的问题,但 caret 包只为我提供了调整 mtry 的机会,但我更喜欢在这里合并一些东西,那怎么可能?
我已经阅读了有关 foreach 和 doParallel 包的信息,但我不太明白这里的语法。
如果需要初始数据请告诉我,此时我只是想展示需要并行计算的部分。
提前致谢
您好,我通常只是手动编写所有代码。在 linux/mac 中,我使用并行包和可以使用内存分叉的 mclapply。分叉进程使用更少的内存并且启动速度更快。 Windows 不支持分叉,因此我使用 doParallel 包(其他包也可以)。 foreach() 函数是一个用户友好的并行映射器。我发现自己花在设置单台 PC 并行计算上的时间比节省加速时间要多。还是很有趣:)
如果您在大学工作,您可能可以访问大型集群。 BatchJobs 包是另一个可以使用许多不同后端的映射器,例如一个 Torque/PBS 问题系统。我可以借用 80 个节点和 4 个 CPU 给我潜在的 320 倍加速(实际上更像是 150 倍)。 I learned about BatchJobs from this great introduction。我喜欢 BatchJobs 还可以 运行 本地单核或多核,这更容易调试。
下面的代码介绍了如何使用foreach 和BatchJobs 创建作业列表。每个作业都是一组参数。工作参数与标准参数融合,并训练模型。返回一些统计数据,所有结果和参数组合成 data.frame.
useForeach = FALSE #If FALSE, will run as batchjobs. Only faster for cluster computing.
library(randomForest)
#load a data set
url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv"
download.file(url,destfile="winequality-white.csv",mode="w")
wwq = read.csv(file="winequality-white.csv",header=T,sep=";")
X = wwq[,names(wwq) != "quality"]
y = wwq[,"quality"]
#2 - make jobs
pars = expand.grid(
mtry = c(1:3),
sampsize = floor(seq(1000,1898,length.out = 3)),
nodesize = c(1,3)
)
jobs = lapply(1:dim(pars)[1], function(i) pars[i,])
#3 - make node function there will excute a number of jobs
test.pars = function(someJobs,useForeach=TRUE) {
#if running cluster, global environment imported manually
if(!useForeach) load(file="thisState.rda")
do.call(rbind,lapply(someJobs,function(aJob){ #do jobs and bind results by rows
print(aJob)
merged.args = c(alist(x=X,y=y),as.list(aJob)) #merge std. and job args
run.time = system.time({rfo = do.call(randomForest,merged.args)}) #run a job
data.frame(accuracy=tail(rfo$rsq,1),run.time=run.time[3],mse=tail(rfo$mse,1))
}))
}
##test function single core
jobsToTest = 1:5
out = test.pars(jobs[jobsToTest])
print(cbind(out,do.call(rbind,jobs[jobsToTest])))
#4a execute jobs with foreach package:
if(useForeach) {
library(foreach)
library(doParallel)
CPUs=4
cl = makeCluster(CPUs)#works both for windows and linux, otherwise forking is better
registerDoParallel(cl)
nodes=min(CPUs,length(jobs)) #how many splits of jobList, not so important for foreach...
job.array = suppressWarnings(split(jobs,1:nodes)) #split warns if each core cannot get same amount of jobs
out = foreach(i=job.array,.combine=rbind,.packages="randomForest") %dopar% test.pars(i)
stopCluster(cl)
} else {
library(BatchJobs)
#4b - execute jobs with BatchJobs package (read manual how to set up on cluster)
nodes=min(80,length(jobs)) # how many nodes to split job onto
job.array = split(jobs,1:nodes)
save(list=ls(),file="thisState.rda") #export this state(global environment) to every node
#initiate run
reg = makeRegistry(id ="myFirstBatchJob",packages="randomForest")
batchMap(reg,fun=test.pars,someJobs = job.array,more.args=list(useForeach=FALSE))
submitJobs(reg)
waitForJobs(reg)
out = loadResults(reg)
#6- wrap up save filnalResults to user
finalResult = cbind(do.call(rbind,jobs),do.call(rbind,out))
save(out,file="finalResult.rda")
removeRegistry(reg,ask="no")
}
#7- print final result
print(cbind(do.call(rbind,jobs),out))
我想弄清楚如何在我创建的这个随机森林循环中使用任何并行处理包,如 foreach 或 doParallel:
ModelInfo <- data.frame ( model=as.numeric()
,Nodesize=as.numeric()
,Mrty=as.numeric()
,Maxdepth=as.numeric()
,Cp=as.numeric()
,Accuracy_Training=as.numeric()
,AUC_Training=as.numeric())
w=1
set.seed(1809)
NumberOfSamples=1
# Number of iterations
rfPred=list()
pred=list()
roundpred=list()
cTab=list()
Acc=list()
pred.to.roc=list()
pred.rocr=list()
perf.rocr=list()
AUC=list()
Var_imp=list()
rf_model_tr = list()
length(rf_model_tr) <- NumberOfSamples
for (i in 1:NumberOfSamples)
{
rf_model_tr[[i]] = list()
rfPred[[i]]=list()
pred[[i]]=list()
roundpred[[i]]=list()
cTab[[i]]=list()
Acc[[i]]=list()
pred.to.roc[[i]]=list()
pred.rocr[[i]]=list()
perf.rocr[[i]]=list()
AUC[[i]]=list()
Var_imp[[i]]=list()
## Tune nodesize
nodesize =c(10,20,50,80,100,200)
n=length(nodesize)
length(rf_model_tr[[i]]) <- n
for ( j in 1: length (nodesize))
{
rf_model_tr[[i]][[j]] = list()
rfPred[[i]][[j]]=list()
pred[[i]][[j]]=list()
roundpred[[i]][[j]]=list()
cTab[[i]][[j]]=list()
Acc[[i]][[j]]=list()
pred.to.roc[[i]][[j]]=list()
pred.rocr[[i]][[j]]=list()
perf.rocr[[i]][[j]]=list()
AUC[[i]][[j]]=list()
Var_imp[[i]][[j]]=list()
## Tune mrty
mrtysize =c(2,3,4)
m=length(mrtysize)
length(rf_model_tr[[i]][[j]]) <- m
for ( k in 1: length (mrtysize))
{
rf_model_tr[[i]][[j]][[k]] = list()
rfPred[[i]][[j]][[k]]=list()
pred[[i]][[j]][[k]]=list()
roundpred[[i]][[j]][[k]]=list()
cTab[[i]][[j]][[k]]=list()
Acc[[i]][[j]][[k]]=list()
pred.to.roc[[i]][[j]][[k]]=list()
pred.rocr[[i]][[j]][[k]]=list()
perf.rocr[[i]][[j]][[k]]=list()
AUC[[i]][[j]][[k]]=list()
Var_imp[[i]][[j]][[k]]=list()
## Tune maxdepth
maxdep =c(10,20,30)
z=length(maxdep)
length(rf_model_tr[[i]][[j]][[k]]) <- z
for (l in 1:length (maxdep))
{
rf_model_tr[[i]][[j]][[k]][[l]] = list()
rfPred[[i]][[j]][[k]][[l]]=list()
pred[[i]][[j]][[k]][[l]]=list()
roundpred[[i]][[j]][[k]][[l]]=list()
cTab[[i]][[j]][[k]][[l]]=list()
Acc[[i]][[j]][[k]][[l]]=list()
pred.to.roc[[i]][[j]][[k]][[l]]=list()
pred.rocr[[i]][[j]][[k]][[l]]=list()
perf.rocr[[i]][[j]][[k]][[l]]=list()
AUC[[i]][[j]][[k]][[l]]=list()
Var_imp[[i]][[j]][[k]][[l]]=list()
## Tune cp
cp =c(0,0.01,0.001)
p=length(cp)
length(rf_model_tr[[i]][[j]][[k]][[l]]) <- p
for (m in 1:length (cp))
{
rf_model_tr[[i]][[j]][[k]][[l]][[m]]= randomForest (as.factor(class) ~.
, data=train,mtry=mrtysize[[k]],maxDepth = maxdep[[l]], replace=F, importance=T, do.trace=10, ntree=200,nodesize=nodesize[j],cp=cp[[m]])
#Accuracy
rfPred[[i]][[j]][[k]][[l]][[m]] <- predict(rf_model_tr[[i]][[j]][[k]][[l]][[m]], train, type = "prob")
pred[[i]][[j]][[k]][[l]][[m]] <- colnames(rfPred[[i]][[j]][[k]][[l]][[m]] )[apply(rfPred[[i]][[j]][[k]][[l]][[m]] ,1,which.max)]
cTab[[i]][[j]][[k]][[l]][[m]] = table(pred[[i]][[j]][[k]][[l]][[m]],train$class)
Acc[[i]][[j]][[k]][[l]][[m]]<- sum(diag(cTab[[i]][[j]][[k]][[l]][[m]])) / sum(cTab[[i]][[j]][[k]][[l]][[m]])
#AUC
pred.to.roc[[i]][[j]][[k]][[l]][[m]]<-rfPred[[i]][[j]][[k]][[l]][[m]][,2]
pred.rocr[[i]][[j]][[k]][[l]][[m]]<-prediction(pred.to.roc[[i]][[j]][[k]][[l]][[m]],as.factor(train$class))
perf.rocr[[i]][[j]][[k]][[l]][[m]]<-performance(pred.rocr[[i]][[j]][[k]][[l]][[m]],measure="auc",x.measure="cutoff")
AUC[[i]][[j]][[k]][[l]][[m]]<-as.numeric(perf.rocr[[i]][[j]][[k]][[l]][[m]]@y.values)
#Variable Importance
Var_imp[[i]][[j]][[k]][[l]][[m]]<-(importance(rf_model_tr[[i]][[j]][[k]][[l]][[m]],type=2))
ModelInfo[w,1]<-w
ModelInfo[w,2]<-nodesize[[j]]
ModelInfo[w,3]<-mrtysize[[k]]
ModelInfo[w,4]<-maxdep[[l]]
ModelInfo[w,5]<-cp[[m]]
ModelInfo[w,6]<-Acc[[i]][[j]][[k]][[l]][[m]]
ModelInfo[w,7]<-AUC[[i]][[j]][[k]][[l]][[m]]
w=w+1
}
}
}
}
}
基本上,我正在做的是根据随机森林的可用调整参数(节点大小、cp 等)使用一个数据集创建所有可能的模型变体,并将该信息存储到 table每次迭代时的模型信息。另外我添加了准确率和AUC等指标,以便比较最终创建的不同模型并做出选择。
我正在寻找替代方案的原因是,尽管我确实有机会 运行 parRF 可以解决我的问题,但 caret 包只为我提供了调整 mtry 的机会,但我更喜欢在这里合并一些东西,那怎么可能?
我已经阅读了有关 foreach 和 doParallel 包的信息,但我不太明白这里的语法。
如果需要初始数据请告诉我,此时我只是想展示需要并行计算的部分。
提前致谢
您好,我通常只是手动编写所有代码。在 linux/mac 中,我使用并行包和可以使用内存分叉的 mclapply。分叉进程使用更少的内存并且启动速度更快。 Windows 不支持分叉,因此我使用 doParallel 包(其他包也可以)。 foreach() 函数是一个用户友好的并行映射器。我发现自己花在设置单台 PC 并行计算上的时间比节省加速时间要多。还是很有趣:)
如果您在大学工作,您可能可以访问大型集群。 BatchJobs 包是另一个可以使用许多不同后端的映射器,例如一个 Torque/PBS 问题系统。我可以借用 80 个节点和 4 个 CPU 给我潜在的 320 倍加速(实际上更像是 150 倍)。 I learned about BatchJobs from this great introduction。我喜欢 BatchJobs 还可以 运行 本地单核或多核,这更容易调试。
下面的代码介绍了如何使用foreach 和BatchJobs 创建作业列表。每个作业都是一组参数。工作参数与标准参数融合,并训练模型。返回一些统计数据,所有结果和参数组合成 data.frame.
useForeach = FALSE #If FALSE, will run as batchjobs. Only faster for cluster computing.
library(randomForest)
#load a data set
url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv"
download.file(url,destfile="winequality-white.csv",mode="w")
wwq = read.csv(file="winequality-white.csv",header=T,sep=";")
X = wwq[,names(wwq) != "quality"]
y = wwq[,"quality"]
#2 - make jobs
pars = expand.grid(
mtry = c(1:3),
sampsize = floor(seq(1000,1898,length.out = 3)),
nodesize = c(1,3)
)
jobs = lapply(1:dim(pars)[1], function(i) pars[i,])
#3 - make node function there will excute a number of jobs
test.pars = function(someJobs,useForeach=TRUE) {
#if running cluster, global environment imported manually
if(!useForeach) load(file="thisState.rda")
do.call(rbind,lapply(someJobs,function(aJob){ #do jobs and bind results by rows
print(aJob)
merged.args = c(alist(x=X,y=y),as.list(aJob)) #merge std. and job args
run.time = system.time({rfo = do.call(randomForest,merged.args)}) #run a job
data.frame(accuracy=tail(rfo$rsq,1),run.time=run.time[3],mse=tail(rfo$mse,1))
}))
}
##test function single core
jobsToTest = 1:5
out = test.pars(jobs[jobsToTest])
print(cbind(out,do.call(rbind,jobs[jobsToTest])))
#4a execute jobs with foreach package:
if(useForeach) {
library(foreach)
library(doParallel)
CPUs=4
cl = makeCluster(CPUs)#works both for windows and linux, otherwise forking is better
registerDoParallel(cl)
nodes=min(CPUs,length(jobs)) #how many splits of jobList, not so important for foreach...
job.array = suppressWarnings(split(jobs,1:nodes)) #split warns if each core cannot get same amount of jobs
out = foreach(i=job.array,.combine=rbind,.packages="randomForest") %dopar% test.pars(i)
stopCluster(cl)
} else {
library(BatchJobs)
#4b - execute jobs with BatchJobs package (read manual how to set up on cluster)
nodes=min(80,length(jobs)) # how many nodes to split job onto
job.array = split(jobs,1:nodes)
save(list=ls(),file="thisState.rda") #export this state(global environment) to every node
#initiate run
reg = makeRegistry(id ="myFirstBatchJob",packages="randomForest")
batchMap(reg,fun=test.pars,someJobs = job.array,more.args=list(useForeach=FALSE))
submitJobs(reg)
waitForJobs(reg)
out = loadResults(reg)
#6- wrap up save filnalResults to user
finalResult = cbind(do.call(rbind,jobs),do.call(rbind,out))
save(out,file="finalResult.rda")
removeRegistry(reg,ask="no")
}
#7- print final result
print(cbind(do.call(rbind,jobs),out))