R:异步并行lapply

R: asynchronous parallel lapply

到目前为止,我发现在 R 中使用并行 lapply 的最简单方法是通过以下示例代码:

library(parallel)
library(pbapply)

cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})

results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)

这有一个非常有用的功能,即为结果提供进度条,并且在不需要并行计算时非常容易重用相同的代码,通过设置 cl = NULL

但是,我注意到的一个问题是 pblapply 正在分批循环遍历列表。例如,如果一个工人在某项任务上停留了很长时间,剩下的工人将等待它完成,然后再开始新的一批工作。对于某些任务,这会为工作流程增加很多不必要的时间。

我的问题: 是否有任何类似的并行框架可以让工作人员独立 运行?进度条和使用 cl=NULL 重用代码的能力将是一大优势。

也许可以修改 pbapply 的现有代码以添加此 option/feature?

您可以在多进程模式下使用 furrr package which uses future to run purrr :

library(furrr)
plan(multisession, workers = nbrOfWorkers()-1)
nbrOfWorkers()
1:100 %>% future_map(~{Sys.sleep(1); rnorm(.x)},.progress = T)
Progress: ──────────────────────────────                                   100%

您可以使用 plan(sequential)

关闭并行计算

(免责声明:我是 future framework and the progressr 包的作者)

类似于 base::lapply() 和您的 pbapply::pblapply() 示例的近似解决方案是将 future.apply 用作:

library(future.apply)

## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
})

分块: 您可以使用参数 future.chunk.size 或补充 future.schedule 来控制分块的数量。要禁用分块以使每个元素在唯一的并行任务中处理,请使用 future.chunk.size=1。这样,如果有一个元素比其他元素花费的时间长得多,它就不会阻止任何其他元素。

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
}, future.chunk.size=1)

并行更新: 如果你想在并行处理时接收进度更新,你可以使用 progressr package and configure it to use the progress 包将更新报告为进度条(这里也有一个 ETA)。

library(future.apply)
plan(multisession, workers=4)

library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))

with_progress({
  p <- progressor(along=xs)
  results <- future_lapply(xs, FUN=function(x) {
    p()  ## signal progress
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
})

你可以把它包装成一个函数,例如

my_fcn <- function(xs) {
  p <- progressor(along=xs)
  future_lapply(xs, FUN=function(x) {
    p()
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
}

这样你就可以像普通函数一样调用它了:

> result <- my_fcn(xs)

并使用 plan() 来准确控制您希望它如何并行化。这不会报告进度。为此,您必须执行以下操作:

> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------]   9%  1m

运行 一切都在后台: 如果您的问题是如何 运行 整个 shebang 在后台,请参阅'Future Topologies' 小插图。这是另一个级别的并行化,但它是可能的。