如何初始化 worker 以并行使用包函数

How to initialize workers to use package functions in parallel

我正在开发一个 R 包并尝试在其中使用并行处理来解决一个令人尴尬的并行问题。我想编写一个循环或函数来使用我的包中的其他函数。我在 Windows 工作,我尝试使用 parallel::parLapplyforeach::%dopar%,但无法让工作人员(核心)访问我的包中的功能。 这是一个包含两个函数的简单包的示例,其中第二个使用 %dopar%:

在并行循环中调用第一个
add10 <- function(x) x + 10

slowadd <- function(m) {
  cl <- parallel::makeCluster(parallel::detectCores() - 1)
  doParallel::registerDoParallel(cl)

  `%dopar%` <- foreach::`%dopar%` # so %dopar% doesn't need to be attached

  foreach::foreach(i = 1:m) %dopar% {
    Sys.sleep(1)
    add10(i)
  }

  stopCluster(cl)
}

当我使用 devtools::load_all() 加载包并调用 slowadd 函数时,返回 Error in { : task 1 failed - "could not find function "add10""

我也试过用我的包显式初始化工作人员:

add10 <- function(x) x + 10

slowadd <- function(m) {
  cl <- parallel::makeCluster(parallel::detectCores() - 1)
  doParallel::registerDoParallel(cl)

  `%dopar%` <- foreach::`%dopar%` # so %dopar% doesn't need to be attached

  foreach::foreach(i = 1:m, .packages = 'mypackage') %dopar% {
    Sys.sleep(1)
    add10(i)
  }

  stopCluster(cl)
}

但我收到错误 Error in e$fun(obj, substitute(ex), parent.frame(), e$data) : worker initialization failed: there is no package called 'mypackage'

如何让工作人员访问我的包中的功能?使用 foreach 的解决方案会很棒,但我完全接受使用 parLapply 或其他 functions/packages.

的解决方案

多亏了人们的帮助评论,我才能够用我的包的功能初始化工作人员。通过确保所有需要的包函数都在 NAMESPACE 中导出并使用 devtools::install() 安装我的包,foreach 能够找到用于初始化的包。该示例的 R 脚本如下所示:

#' @export
add10 <- function(x) x + 10

#' @export
slowadd <- function(m) {
  cl <- parallel::makeCluster(parallel::detectCores() - 1)
  doParallel::registerDoParallel(cl)

  `%dopar%` <- foreach::`%dopar%` # so %dopar% doesn't need to be attached

  out <- foreach::foreach(i = 1:m, .packages = 'mypackage') %dopar% {
    Sys.sleep(1)
    add10(i)
  }

  stopCluster(cl)
  return(out)
} 

这可行,但不是理想的解决方案。首先,它使工作流程慢得多。每次我对包进行更改并想测试它(在合并并行性之前)时,我都使用 devtools::load_all(),但现在我每次都必须重新安装包,当包很大时,这很慢。其次,需要导出并行循环中需要的每个函数,以便 foreach 可以找到它。我的实际用例有很多小的实用功能,我宁愿保留在内部。

您可以在 foreach 循环中使用 devtools::load_all() 或使用 source.

加载您需要的函数
out <- foreach::foreach(i = 1:m ) %dopar% {
    Sys.sleep(1)
    source("R/some_functions.R")
    load("R/sysdata.rda")
    add10(i)
  }