如何在多个 R 作业 运行 并行完成后 运行 另一个 Rscript?

How to run another Rscript after several R jobs running in parallel are done?

关于我需要如何 运行 我的脚本的安排是首先使用 rstudioapi::jobRunScript() 函数并行 运行 4 个 R 脚本。 运行 并行的每个脚本不会从任何环境导入任何内容,而是将创建的数据帧导出到全局环境。 我的第 5 个 R 脚本建立在由 运行 并行 的 4 个 R 脚本创建的数据帧的基础上,这第 5 个脚本也在控制台中 运行ning。如果有办法 运行 在后台 第 5 个脚本,而不是在前 4 个 R 脚本并行完成 运行 之后在控制台中,那将会好很多。我也在尝试减少整个过程的总 运行ning 时间。

虽然我能够弄清楚如何并行地 运行 前 4 个 R 脚本,但我的任务还没有完全完成,因为我找不到如何触发 运行我的第 5 个 R 脚本。希望大家能帮帮我

这对我来说有点太开放了。虽然 rstudioapi 绝对可以用于 运行ning 并行任务,但它不是很通用并且不会为您提供非常有用的输出。 parallel Universe 在 R 中得到了很好的实现,有几个包提供了一个更简单和更好的接口来执行此操作。这里有 3 个选项,它们还允许来自不同文件的内容 'output'。

包=并行

使用并行包,我们可以非常简单地实现这一点。只需创建一个要获取的文件向量并在每个线程中执行 source。主进程将在 运行ning 时锁定,但如果您无论如何都必须等待它们完成,这并不重要。

library(parallel)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
# full path to file that should execute 
files <- c(...) 
# use an lapply in parallel.
result <- parLapply(cl, files, source)
# Remember to close the cluster
stopCluster(cl)
# If anything is returned this can now be used.

作为旁注,一些包具有与 parallel 包相似的接口,后者是在 snow 包的基础上构建的,因此这是一个很好的了解基础。

包=foreach

parallel 包的替代方案是 foreach 包,它提供了类似于 for-loop 接口的东西,简化了接口,同时提供了更大的灵活性并自动导入必要的库和变量(尽管手动执行此操作更安全)。
foreach 软件包确实依赖于 paralleldoParallel 软件包来设置集群,但是

library(parallel)
library(doParallel)
library(foreach)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
files <- c(...) 
registerDoParallel(cl)
# Run parallel using foreach
# remember %dopar% for parallel. %do% for sequential.
result <- foreach(file = files, .combine = list, .multicombine = TRUE) %dopar% { 
  source(file)
  # Add any code before or after source.
}
# Stop cluster
stopCluster(cl)
# Do more stuff. Result holds any result returned by foreach.

虽然它确实添加了几行代码,但 .combine.packages.export 为 R 中的并行计算提供了一个非常简单的接口。

包=未来

现在这是要使用的比较少见的软件包之一。 future 提供了一个比 parallelforeach 更灵活的并行接口,允许异步并行编程。然而,实施似乎有点令人生畏,而我在下面提供的示例只是触及了可能的表面。
另外值得一提的是,虽然 future 包确实提供了 运行 代码所需的函数和包的自动导入,但经验让我意识到这仅限于任何调用的第一层深度(有时更少),因为这样的导出仍然是必要的。
虽然 foreach 依赖于 parallel(或类似的)来启动集群,但 foreach 将使用所有可用的核心自行启动一个集群。对 plan(multiprocess) 的简单调用将启动多核会话。

library(future)
files <- c(...) 
# Start multiprocess session
plan(multiprocess)
# Simple wrapper function, so we can iterate over the files variable easier
source_future <- function(file)
  future(file)
results <- lapply(files, source_future)
# Do some calculations in the meantime
print('hello world, I am running while waiting for the futures to finish')
# Force waiting for the futures to finish
resolve(results)
# Extract any result from the futures
results <- values(results)
# Clean up the process (close down clusters)
plan(sequential)
# Run some more code.

乍一看这似乎很沉重,但一般机制是:

  1. 呼叫plan(multiprocess)
  2. 使用 future(或 %<-%,我不会深入)执行一些功能
  3. 如果您有更多代码要 运行,请执行其他操作,这不依赖于进程
  4. 使用 resolve 等待结果,它适用于列表(或环境)中的单个未来或多个未来
  5. 对单个期货使用 value 或对列表(或环境)中的多个期货使用 values 收集结果
  6. 使用 plan(sequential)
  7. 清除 future 环境中的任何集群 运行ning
  8. 继续使用取决于您的期货结果的代码。

我相信这 3 个包为任何用户需要与之交互的多处理的每个必要元素(至少在 CPU 上)提供了接口。其他包提供替代接口,而对于异步,我只知道 futurepromises。总的来说,我建议大多数用户在进入异步编程时要非常小心,因为这可能会导致一整套问题,与同步并行编程相比,这些问题的发生频率较低。

我希望这可能有助于为(非常有限的)rstudioapi 接口提供一个替代方案,我相当确定该接口从未打算由用户自己用于并行编程,但更有可能用于通过接口本身执行并行构建包等任务。

我不知道这对您当前的情况有多适应,但这里有一种方法可以并行处理四件事 运行,获取它们的 return 值,然后触发第五个expression/function.

前提是 callr::r_bg 用于 运行 单个文件。这实际上 运行 是一个 function,而不是一个文件,所以我将修改对这些文件的预期 一点点 .

我将编写一个辅助脚本,旨在模仿您的四个脚本之一。我猜你也希望能够正常获取它(运行 它直接而不是作为一个函数),所以我将生成脚本文件,以便它“知道”它是被获取还是 运行 直接(基于 )。 (如果你知道 python,这类似于 python 的 if __name__ == "__main__" 技巧。)

名为 somescript.R 的辅助脚本。

somefunc <- function(seconds) {
  # put the contents of a script file in this function, and have
  # it return() the data you need back in the calling environment
  Sys.sleep(seconds)
  return(mtcars[sample(nrow(mtcars),2),1:3])
}

if (sys.nframe() == 0L) {
  # if we're here, the script is being Rscript'd, not source'd
  somefunc(3)
}

作为演示,如果在控制台上source,这个只是定义函数(或者多个,如果你愿意),它不执行最后 if 块中的代码:

system.time(source("~/Whosebug/14182669/somescript.R"))
#                                   # <--- no output, it did not return a sample from mtcars
#    user  system elapsed 
#       0       0       0           # <--- no time passed

但是如果我在终端中 运行 使用 Rscript

$ time /c/R/R-4.0.2/bin/x64/Rscript somescript.R
               mpg cyl  disp
Merc 280C     17.8   6 167.6
Mazda RX4 Wag 21.0   6 160.0

real    0m3.394s                    # <--- 3 second sleep
user    0m0.000s
sys     0m0.015s

回到前提。而不是四个“脚本”,像我上面的 somescript.R 一样重写你的脚本文件。如果操作正确,它们可以 Rscripted 以及 sourced 以不同的意图。

我将使用这个脚本四次而不是四个脚本。这是我们想要自动化的手册 run-through:

# library(callr)
tasks <- list(
  callr::r_bg(somefunc, args = list(5)),
  callr::r_bg(somefunc, args = list(1)),
  callr::r_bg(somefunc, args = list(10)),
  callr::r_bg(somefunc, args = list(3))
)
sapply(tasks, function(tk) tk$is_alive())
# [1]  TRUE FALSE  TRUE FALSE
### time passes
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE  TRUE FALSE
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE FALSE FALSE

tasks[[1]]$get_result()
#                    mpg cyl  disp  hp drat    wt  qsec vs am gear carb
# Merc 280          19.2   6 167.6 123 3.92 3.440 18.30  1  0    4    4
# Chrysler Imperial 14.7   8 440.0 230 3.23 5.345 17.42  0  0    3    4

我们可以使用

将其自动化
source("somescript.R")
message(Sys.time(), " starting")
# 2020-08-28 07:45:31 starting
tasks <- list(
  callr::r_bg(somefunc, args = list(5)),
  callr::r_bg(somefunc, args = list(1)),
  callr::r_bg(somefunc, args = list(10)),
  callr::r_bg(somefunc, args = list(3))
)
# some reasonable time-between-checks
while (any(sapply(tasks, function(tk) tk$is_alive()))) {
  message(Sys.time(), " still waiting")
  Sys.sleep(1)                      # <-- over to you for a reasonable poll interval
}
# 2020-08-28 07:45:32 still waiting
# 2020-08-28 07:45:33 still waiting
# 2020-08-28 07:45:34 still waiting
# 2020-08-28 07:45:35 still waiting
# 2020-08-28 07:45:36 still waiting
# 2020-08-28 07:45:37 still waiting
# 2020-08-28 07:45:38 still waiting
# 2020-08-28 07:45:39 still waiting
# 2020-08-28 07:45:40 still waiting
# 2020-08-28 07:45:41 still waiting
message(Sys.time(), " done!")
# 2020-08-28 07:45:43 done!
results <- lapply(tasks, function(tk) tk$get_result())
str(results)
# List of 4
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 24.4 32.4
#   ..$ cyl : num [1:2] 4 4
#   ..$ disp: num [1:2] 146.7 78.7
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 30.4 14.3
#   ..$ cyl : num [1:2] 4 8
#   ..$ disp: num [1:2] 95.1 360
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 15.2 15.8
#   ..$ cyl : num [1:2] 8 8
#   ..$ disp: num [1:2] 276 351
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 14.3 15.2
#   ..$ cyl : num [1:2] 8 8
#   ..$ disp: num [1:2] 360 304

现在 运行 你的第五个 function/script。

您可以将 promisesfuture 结合使用:promises::promise_all 后跟 promises::then 允许在启动最后一个之前等待前一个 futures 完成一个作为后台进程。
当作业在后台 运行 时,您可以控制控制台。

library(future)
library(promises)

plan(multisession)
# Job1
fJob1<- future({
  # Simulate a short duration job
  Sys.sleep(3)
  cat('Job1 done \n')
})

# Job2
fJob2<- future({
  # Simulate a medium duration job
  Sys.sleep(6)
  cat('Job2 done \n')
})


# Job3
fJob3<- future({
  # Simulate a long duration job
  Sys.sleep(10)
  cat('Job3 done \n')
})

# last Job
runLastJob <- function(res) {
  cat('Last job launched \n')
  # launch here script for last job
}

# Cancel last Job
cancelLastJob <- function(res) {
  cat('Last job not launched \n')
}

#  Wait for all jobs to be completed and launch last job
p_wait_all <- promises::promise_all(fJob1, fJob2, fJob3 )
promises::then(p_wait_all,  onFulfilled = runLastJob, onRejected = cancelLastJob)

Job1 done 
Job2 done 
Job3 done 
Last job launched