使我的 R 包中的函数可并行化的最佳做法是什么?

What is the best practice for making functions in my R package parallelizable?

我开发了一个包含 embarassingly parallel 函数的 R 包。

我想以对用户透明的方式为这些函数实现并行化,而不考虑 his/her OS(至少理想情况下)。

我环顾四周,看看其他包作者是如何在用户上导入 foreach-based Parallelism. For example, Max Kuhn's caret package imports foreach to use %dopar% but relies 来指定并行后端的。 (几个示例使用 doMC,它不适用于 Windows。)

注意到 doParallel works for Windows and Linux/OSX and uses the built-in parallel package (see comments 用于有用的讨论),只要用户指定 parallel=TRUE 作为参数,导入 doParallel 并让我的函数调用 registerDoParallel() 是否有意义?

我认为允许用户注册自己的并行后端非常重要。 doParallel 后端非常便携,但如果他们想 运行 您的函数在集群的多个节点上运行怎么办?如果他们想设置 makeCluster "outfile" 选项怎么办?不幸的是,如果使并行支持透明化也会使它对您的许多用户无用。

我建议你使用getDoParRegistered功能查看用户是否已经注册了并行后端,如果没有则只为他们注册一个。

这是一个例子:

library(doParallel)
parfun <- function(n=10, parallel=FALSE,
                   cores=getOption('mc.cores', 2L)) {
  if (parallel) {
    # honor registration made by user, and only create and register
    # our own cluster object once
    if (! getDoParRegistered()) {
      cl <- makePSOCKcluster(cores)
      registerDoParallel(cl)
      message('Registered doParallel with ',
              cores, ' workers')
    } else {
      message('Using ', getDoParName(), ' with ',
              getDoParWorkers(), ' workers')
    }
    `%d%` <- `%dopar%`
  } else {
    message('Executing parfun sequentially')
    `%d%` <- `%do%`
  }

  foreach(i=seq_len(n), .combine='c') %d% {
    Sys.sleep(1)
    i
  }
}

如果 parallel=TRUE,这是这样写的,即使他们注册了一个并行后端,它也只有 运行s 并行:

> parfun()
Executing parfun sequentially
 [1]  1  2  3  4  5  6  7  8  9 10

如果parallel=TRUE并且他们还没有注册后端,那么它将为他们创建并注册一个集群对象:

> parfun(parallel=TRUE, cores=3)
Registered doParallel with 3 workers
 [1]  1  2  3  4  5  6  7  8  9 10

如果再次用parallel=TRUE调用parfun,它将使用之前注册的集群:

> parfun(parallel=TRUE)
Using doParallelSNOW with 3 workers
 [1]  1  2  3  4  5  6  7  8  9 10

这可以通过多种方式进行细化:这只是一个简单的演示。但至少它提供了一种便利,而不会阻止用户使用其环境中可能需要的自定义选项注册不同的后端。


请注意,默认数量 cores/workers 的选择也是一个棘手的问题,也是 CRAN 维护人员关心的问题。这就是为什么我没有设置默认内核数 detectCores() 的原因。相反,我正在使用 mclapply 使用的方法,尽管也许应该使用不同的选项名称。


关于stopCluster

请注意,此示例有时会创建一个新的集群对象,但它绝不会通过调用 stopCluster 来停止它。原因是创建集群对象可能很昂贵,所以我喜欢在多个 foreach 循环中重用它们,而不是每次都创建和销毁它们。我宁愿把它留给用户,但是,在这个例子中,用户没有办法这样做,因为他们无权访问 cl 变量。

可以通过三种方式处理:

  • 每当调用 makePSOCKcluster 时,在 parfun 中调用 stopCluster
  • 编写一个附加函数,允许用户停止隐式创建的集群对象(相当于doParallel包中的stopImplicitCluster函数);
  • 不用担心隐式创建的集群对象。

我可能会为我自己的代码选择第二个选项,但这会使这个示例变得非常复杂。已经很复杂了。

作为未来包的作者,推荐大家看看。 future 包将所有 parallel 的并行/集群功能统一到一个 API.

https://cran.r-project.org/package=future

它的设计让您作为开发人员编写一次代码,然后由用户决定后端,例如plan(multiprocess)plan(cluster, workers = c("n1", "n3", "remote.server.org"))

如果用户可以使用 Slurm、TORQUE/PBS 和 SGE 等常用调度程序之一访问 HPC 集群,则他们可以使用 future.BatchJobs 包实现未来 API 在 BatchJobs 之上,例如plan(batchjobs_slurm)。您的代码保持不变。 (很快还会有 future.batchtools 包在 batchtools 之上)。