R:异步并行lapply
R: asynchronous parallel lapply
到目前为止,我发现在 R 中使用并行 lapply
的最简单方法是通过以下示例代码:
library(parallel)
library(pbapply)
cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})
results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)
这有一个非常有用的功能,即为结果提供进度条,并且在不需要并行计算时非常容易重用相同的代码,通过设置 cl = NULL
。
但是,我注意到的一个问题是 pblapply
正在分批循环遍历列表。例如,如果一个工人在某项任务上停留了很长时间,剩下的工人将等待它完成,然后再开始新的一批工作。对于某些任务,这会为工作流程增加很多不必要的时间。
我的问题:
是否有任何类似的并行框架可以让工作人员独立 运行?进度条和使用 cl=NULL
重用代码的能力将是一大优势。
也许可以修改 pbapply
的现有代码以添加此 option/feature?
您可以在多进程模式下使用 furrr
package which uses future
to run purrr
:
library(furrr)
plan(multisession, workers = nbrOfWorkers()-1)
nbrOfWorkers()
1:100 %>% future_map(~{Sys.sleep(1); rnorm(.x)},.progress = T)
Progress: ────────────────────────────── 100%
您可以使用 plan(sequential)
关闭并行计算
(免责声明:我是 future framework and the progressr 包的作者)
类似于 base::lapply()
和您的 pbapply::pblapply()
示例的近似解决方案是将 future.apply 用作:
library(future.apply)
## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
})
分块:
您可以使用参数 future.chunk.size
或补充 future.schedule
来控制分块的数量。要禁用分块以使每个元素在唯一的并行任务中处理,请使用 future.chunk.size=1
。这样,如果有一个元素比其他元素花费的时间长得多,它就不会阻止任何其他元素。
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
并行更新:
如果你想在并行处理时接收进度更新,你可以使用 progressr package and configure it to use the progress 包将更新报告为进度条(这里也有一个 ETA)。
library(future.apply)
plan(multisession, workers=4)
library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))
with_progress({
p <- progressor(along=xs)
results <- future_lapply(xs, FUN=function(x) {
p() ## signal progress
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
})
你可以把它包装成一个函数,例如
my_fcn <- function(xs) {
p <- progressor(along=xs)
future_lapply(xs, FUN=function(x) {
p()
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
}
这样你就可以像普通函数一样调用它了:
> result <- my_fcn(xs)
并使用 plan()
来准确控制您希望它如何并行化。这不会报告进度。为此,您必须执行以下操作:
> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------] 9% 1m
运行 一切都在后台: 如果您的问题是如何 运行 整个 shebang 在后台,请参阅'Future Topologies' 小插图。这是另一个级别的并行化,但它是可能的。
到目前为止,我发现在 R 中使用并行 lapply
的最简单方法是通过以下示例代码:
library(parallel)
library(pbapply)
cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})
results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)
这有一个非常有用的功能,即为结果提供进度条,并且在不需要并行计算时非常容易重用相同的代码,通过设置 cl = NULL
。
但是,我注意到的一个问题是 pblapply
正在分批循环遍历列表。例如,如果一个工人在某项任务上停留了很长时间,剩下的工人将等待它完成,然后再开始新的一批工作。对于某些任务,这会为工作流程增加很多不必要的时间。
我的问题:
是否有任何类似的并行框架可以让工作人员独立 运行?进度条和使用 cl=NULL
重用代码的能力将是一大优势。
也许可以修改 pbapply
的现有代码以添加此 option/feature?
您可以在多进程模式下使用 furrr
package which uses future
to run purrr
:
library(furrr)
plan(multisession, workers = nbrOfWorkers()-1)
nbrOfWorkers()
1:100 %>% future_map(~{Sys.sleep(1); rnorm(.x)},.progress = T)
Progress: ────────────────────────────── 100%
您可以使用 plan(sequential)
(免责声明:我是 future framework and the progressr 包的作者)
类似于 base::lapply()
和您的 pbapply::pblapply()
示例的近似解决方案是将 future.apply 用作:
library(future.apply)
## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
})
分块:
您可以使用参数 future.chunk.size
或补充 future.schedule
来控制分块的数量。要禁用分块以使每个元素在唯一的并行任务中处理,请使用 future.chunk.size=1
。这样,如果有一个元素比其他元素花费的时间长得多,它就不会阻止任何其他元素。
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
并行更新: 如果你想在并行处理时接收进度更新,你可以使用 progressr package and configure it to use the progress 包将更新报告为进度条(这里也有一个 ETA)。
library(future.apply)
plan(multisession, workers=4)
library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))
with_progress({
p <- progressor(along=xs)
results <- future_lapply(xs, FUN=function(x) {
p() ## signal progress
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
})
你可以把它包装成一个函数,例如
my_fcn <- function(xs) {
p <- progressor(along=xs)
future_lapply(xs, FUN=function(x) {
p()
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
}
这样你就可以像普通函数一样调用它了:
> result <- my_fcn(xs)
并使用 plan()
来准确控制您希望它如何并行化。这不会报告进度。为此,您必须执行以下操作:
> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------] 9% 1m
运行 一切都在后台: 如果您的问题是如何 运行 整个 shebang 在后台,请参阅'Future Topologies' 小插图。这是另一个级别的并行化,但它是可能的。