使用foreach和doSNOW并行调用外部程序:如何导入结果?

Calling external program in parallel using foreach and doSNOW: How to import results?

我正在使用 R 在具有多个节点和多个内核的集群上并行调用外部程序。外部程序需要三个输入数据文件并生成一个输出文件(所有文件都存储在同一子文件夹中)。 为了并行 运行 程序(或者更确切地说以并行方式调用它),我最初将 foreach 函数与 doParallel 库一起使用。只要我只是在单个节点上使用多个内核,这就可以正常工作。

但是,我想使用具有多个内核的多个节点。因此我相应地修改了我的代码以将 doSNOW 库与 foreach 结合使用(我尝试了 RmpidoMPI,但我没有设法 运行使用这些库在多个节点上编写代码)。 这很好用,我。 e.外部程序现在确实 运行 在多个节点(具有多个内核)上并且集群日志文件显示,它产生了所需的结果。但是,我现在面临的问题是外部程序不再将 results/output 文件存储在主 node/in 工作目录的指定子文件夹中(当我使用 doParallel).这使得我无法将结果导入 R。

事实上,如果我检查相关文件夹的内容,它不包含任何输出文件,尽管日志文件清楚地显示外部程序 运行 成功。我猜它们存储在不同的节点上(?)。 我必须对我的 foreach 功能或我设置集群的方式进行哪些修改,才能将这些文件保存在我的工作目录中的主 node/in 指定子文件夹中?

这里有一些示例 R 代码,用于展示我在做什么:

# #Set working directory in non-interactive mode
setwd(system("pwd", intern = T))


# #Load some libraries
library(foreach)
library(parallel)
library(doParallel)


# ####Parallel tasks####
# #Create doSNOW cluster for parallel tasks
library(doSNOW)
nCoresPerNode <- as.numeric(Sys.getenv("PBS_NUM_PPN"))
nodeNames <- system("cat $PBS_NODEFILE | uniq", intern=TRUE)
machines <- rep(nodeNames, each = nCoresPerNode)
cl <- makeCluster(machines, type = "SOCK")
registerDoSNOW(cl)

# #How many workers are we using?
getDoParWorkers()

#####DUMMY CODE#####
# #The following 3 lines of code are just dummy code: 
# #The idea is to create input files for the external program "myprogram"
external_Command_Script.cmd # #command file necessary for external program "myprogram" to run
startdata # #some input data for "myprogram"
enddata # #additional input data for "myprogram"
####DUMMY CODE######


# #Write necessary command and data files for external program: THIS WORKS!
for(i in 1:100)){
  write(external_Command_Script.cmd[[i]], file=paste("./mysubfolder/external_Command_Script.",i,".cmd", sep=""))
  write.table(startdata, file=paste("./mysubfolder/","startdata.",i,".txt", sep=""), col.names = FALSE, quote=FALSE)
  write.table(enddata, file=paste("./mysubfolder/","enddata.",i,".txt", sep=""), col.names = FALSE, quote=FALSE)
}


# #Run external program "myprogram" in parallel: THIS WORKS!
foreach(i = 1:100)) %dopar% {
    system(paste('(cd ./mysubfolder && ',"myprogram",' ' ,"enddata.",i,".txt ", "startdata.",i,".txt", sep="",' < external_Command_Script.',i,'.cmd)'))
}



# #Import results of external program: THIS DOES NOT WORK WHEN RUN ON MULTIPLE NODES!
results <- list()
for(i in 1:100)){
    results[[i]] = read.table(paste("./mysubfolder/","enddata.txt.",i,".log.txt", sep=""), sep = "\t",  quote="\"", header = TRUE)
}

# #The import does NOT work as the files created by the external program are NOT stored on the master node/in the
# #subfolder of the working directory!
# #Instead I get the following error message:
# #sh: line 0: cd: ./mysubfolder: No such file or directory
# #Error in { : task 6 failed - "cannot open the connection"

我的集群 pbs 脚本如下所示:

#!/bin/bash
# request resources:
#PBS -l nodes=2:ppn=8
#PBS -l walltime=00:30:00
module add languages/R-3.3.3-ATLAS
export PBS_O_WORKDIR="/panfs/panasas01/gely/xxxxxxx/workingdirectory"
# on compute node, change directory to 'submission directory':
cd $PBS_O_WORKDIR

# run your program and time it:
time Rscript ./R_script.R

问题已解决: 我犯了一个错误:外部程序实际上不是 运行ning - 我误解了日志文件。外部程序不运行的原因是找不到子文件夹(包含必要的输入数据)。好像集群默认的是用户目录,而不是pbs提交脚本中指定的工作目录。此行为不同于使用 doParallel 创建的集群,后者确实识别工作目录。因此,只需在 R 脚本中添加工作目录和子文件夹的相对路径即可解决问题,即。 e. ./workingdirectory/mysubfolder/ 而不仅仅是 ./mysubfolder/。或者,您也可以使用文件夹的完整路径。

我建议您查看 batchtools 包。它提供了与 R 中的 TORQUE / PBS 交互的方法。

如果您可以使用它的前身 BatchJobs 一段时间,我也建议您尝试一下,当您了解其工作原理后,请查看 doFuture foreach 适配器。这将允许您使用 future.BatchJobs 包。 doFuture、future.BatchJobs 和 BatchJobs 的这种组合允许您在 R 中执行所有操作,而不必担心创建临时 R 脚本等。(免责声明:我是两者的作者)。

设置完成后的示例:

## Tell foreach to use futures
library("doFuture")
registerDoFuture()

## Tell futures to use TORQUE / PBS with help from BatchJobs
library("future.BatchJobs")
plan(batchjobs_torque)

然后你使用:

res <- foreach(i = 1:100) %dopar% {
  my_function(pathname[i], arg1, arg2)
}

这将在单独的 PBS 作业中评估每个迭代,即您将看到 100 个作业添加到队列中。

future.BatchJobs 插图有更多示例和信息。

更新 2017-07-30:future.batchtools 包自 2017 年 5 月起在 CRAN 上。现在推荐这个包超过 future.BatchJobs。用法与上述非常相似,例如而不是 plan(batchjobs_torque) 你现在使用 plan(batchtools_torque).