如何在多个 R 作业 运行 并行完成后 运行 另一个 Rscript?
How to run another Rscript after several R jobs running in parallel are done?
关于我需要如何 运行 我的脚本的安排是首先使用 rstudioapi::jobRunScript()
函数并行 运行 4 个 R 脚本。 运行 并行的每个脚本不会从任何环境导入任何内容,而是将创建的数据帧导出到全局环境。 我的第 5 个 R 脚本建立在由 运行 并行 的 4 个 R 脚本创建的数据帧的基础上,这第 5 个脚本也在控制台中 运行ning。如果有办法 运行 在后台 第 5 个脚本,而不是在前 4 个 R 脚本并行完成 运行 之后在控制台中,那将会好很多。我也在尝试减少整个过程的总 运行ning 时间。
虽然我能够弄清楚如何并行地 运行 前 4 个 R 脚本,但我的任务还没有完全完成,因为我找不到如何触发 运行我的第 5 个 R 脚本。希望大家能帮帮我
这对我来说有点太开放了。虽然 rstudioapi
绝对可以用于 运行ning 并行任务,但它不是很通用并且不会为您提供非常有用的输出。 parallel
Universe 在 R 中得到了很好的实现,有几个包提供了一个更简单和更好的接口来执行此操作。这里有 3 个选项,它们还允许来自不同文件的内容 'output'。
包=并行
使用并行包,我们可以非常简单地实现这一点。只需创建一个要获取的文件向量并在每个线程中执行 source
。主进程将在 运行ning 时锁定,但如果您无论如何都必须等待它们完成,这并不重要。
library(parallel)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
# full path to file that should execute
files <- c(...)
# use an lapply in parallel.
result <- parLapply(cl, files, source)
# Remember to close the cluster
stopCluster(cl)
# If anything is returned this can now be used.
作为旁注,一些包具有与 parallel
包相似的接口,后者是在 snow
包的基础上构建的,因此这是一个很好的了解基础。
包=foreach
parallel
包的替代方案是 foreach
包,它提供了类似于 for-loop
接口的东西,简化了接口,同时提供了更大的灵活性并自动导入必要的库和变量(尽管手动执行此操作更安全)。
foreach
软件包确实依赖于 parallel
和 doParallel
软件包来设置集群,但是
library(parallel)
library(doParallel)
library(foreach)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
files <- c(...)
registerDoParallel(cl)
# Run parallel using foreach
# remember %dopar% for parallel. %do% for sequential.
result <- foreach(file = files, .combine = list, .multicombine = TRUE) %dopar% {
source(file)
# Add any code before or after source.
}
# Stop cluster
stopCluster(cl)
# Do more stuff. Result holds any result returned by foreach.
虽然它确实添加了几行代码,但 .combine
、.packages
和 .export
为 R 中的并行计算提供了一个非常简单的接口。
包=未来
现在这是要使用的比较少见的软件包之一。 future
提供了一个比 parallel
和 foreach
更灵活的并行接口,允许异步并行编程。然而,实施似乎有点令人生畏,而我在下面提供的示例只是触及了可能的表面。
另外值得一提的是,虽然 future
包确实提供了 运行 代码所需的函数和包的自动导入,但经验让我意识到这仅限于任何调用的第一层深度(有时更少),因为这样的导出仍然是必要的。
虽然 foreach
依赖于 parallel
(或类似的)来启动集群,但 foreach
将使用所有可用的核心自行启动一个集群。对 plan(multiprocess)
的简单调用将启动多核会话。
library(future)
files <- c(...)
# Start multiprocess session
plan(multiprocess)
# Simple wrapper function, so we can iterate over the files variable easier
source_future <- function(file)
future(file)
results <- lapply(files, source_future)
# Do some calculations in the meantime
print('hello world, I am running while waiting for the futures to finish')
# Force waiting for the futures to finish
resolve(results)
# Extract any result from the futures
results <- values(results)
# Clean up the process (close down clusters)
plan(sequential)
# Run some more code.
乍一看这似乎很沉重,但一般机制是:
- 呼叫
plan(multiprocess)
- 使用
future
(或 %<-%
,我不会深入)执行一些功能
- 如果您有更多代码要 运行,请执行其他操作,这不依赖于进程
- 使用
resolve
等待结果,它适用于列表(或环境)中的单个未来或多个未来
- 对单个期货使用
value
或对列表(或环境)中的多个期货使用 values
收集结果
- 使用
plan(sequential)
清除 future
环境中的任何集群 运行ning
- 继续使用取决于您的期货结果的代码。
我相信这 3 个包为任何用户需要与之交互的多处理的每个必要元素(至少在 CPU 上)提供了接口。其他包提供替代接口,而对于异步,我只知道 future
和 promises
。总的来说,我建议大多数用户在进入异步编程时要非常小心,因为这可能会导致一整套问题,与同步并行编程相比,这些问题的发生频率较低。
我希望这可能有助于为(非常有限的)rstudioapi
接口提供一个替代方案,我相当确定该接口从未打算由用户自己用于并行编程,但更有可能用于通过接口本身执行并行构建包等任务。
我不知道这对您当前的情况有多适应,但这里有一种方法可以并行处理四件事 运行,获取它们的 return 值,然后触发第五个expression/function.
前提是 callr::r_bg
用于 运行 单个文件。这实际上 运行 是一个 function
,而不是一个文件,所以我将修改对这些文件的预期 一点点 .
我将编写一个辅助脚本,旨在模仿您的四个脚本之一。我猜你也希望能够正常获取它(运行 它直接而不是作为一个函数),所以我将生成脚本文件,以便它“知道”它是被获取还是 运行 直接(基于 )。 (如果你知道 python
,这类似于 python 的 if __name__ == "__main__"
技巧。)
名为 somescript.R
的辅助脚本。
somefunc <- function(seconds) {
# put the contents of a script file in this function, and have
# it return() the data you need back in the calling environment
Sys.sleep(seconds)
return(mtcars[sample(nrow(mtcars),2),1:3])
}
if (sys.nframe() == 0L) {
# if we're here, the script is being Rscript'd, not source'd
somefunc(3)
}
作为演示,如果在控制台上source
,这个只是定义函数(或者多个,如果你愿意),它不执行最后 if
块中的代码:
system.time(source("~/Whosebug/14182669/somescript.R"))
# # <--- no output, it did not return a sample from mtcars
# user system elapsed
# 0 0 0 # <--- no time passed
但是如果我在终端中 运行 使用 Rscript
,
$ time /c/R/R-4.0.2/bin/x64/Rscript somescript.R
mpg cyl disp
Merc 280C 17.8 6 167.6
Mazda RX4 Wag 21.0 6 160.0
real 0m3.394s # <--- 3 second sleep
user 0m0.000s
sys 0m0.015s
回到前提。而不是四个“脚本”,像我上面的 somescript.R
一样重写你的脚本文件。如果操作正确,它们可以 Rscript
ed 以及 source
d 以不同的意图。
我将使用这个脚本四次而不是四个脚本。这是我们想要自动化的手册 run-through:
# library(callr)
tasks <- list(
callr::r_bg(somefunc, args = list(5)),
callr::r_bg(somefunc, args = list(1)),
callr::r_bg(somefunc, args = list(10)),
callr::r_bg(somefunc, args = list(3))
)
sapply(tasks, function(tk) tk$is_alive())
# [1] TRUE FALSE TRUE FALSE
### time passes
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE TRUE FALSE
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE FALSE FALSE
tasks[[1]]$get_result()
# mpg cyl disp hp drat wt qsec vs am gear carb
# Merc 280 19.2 6 167.6 123 3.92 3.440 18.30 1 0 4 4
# Chrysler Imperial 14.7 8 440.0 230 3.23 5.345 17.42 0 0 3 4
我们可以使用
将其自动化
source("somescript.R")
message(Sys.time(), " starting")
# 2020-08-28 07:45:31 starting
tasks <- list(
callr::r_bg(somefunc, args = list(5)),
callr::r_bg(somefunc, args = list(1)),
callr::r_bg(somefunc, args = list(10)),
callr::r_bg(somefunc, args = list(3))
)
# some reasonable time-between-checks
while (any(sapply(tasks, function(tk) tk$is_alive()))) {
message(Sys.time(), " still waiting")
Sys.sleep(1) # <-- over to you for a reasonable poll interval
}
# 2020-08-28 07:45:32 still waiting
# 2020-08-28 07:45:33 still waiting
# 2020-08-28 07:45:34 still waiting
# 2020-08-28 07:45:35 still waiting
# 2020-08-28 07:45:36 still waiting
# 2020-08-28 07:45:37 still waiting
# 2020-08-28 07:45:38 still waiting
# 2020-08-28 07:45:39 still waiting
# 2020-08-28 07:45:40 still waiting
# 2020-08-28 07:45:41 still waiting
message(Sys.time(), " done!")
# 2020-08-28 07:45:43 done!
results <- lapply(tasks, function(tk) tk$get_result())
str(results)
# List of 4
# $ :'data.frame': 2 obs. of 3 variables:
# ..$ mpg : num [1:2] 24.4 32.4
# ..$ cyl : num [1:2] 4 4
# ..$ disp: num [1:2] 146.7 78.7
# $ :'data.frame': 2 obs. of 3 variables:
# ..$ mpg : num [1:2] 30.4 14.3
# ..$ cyl : num [1:2] 4 8
# ..$ disp: num [1:2] 95.1 360
# $ :'data.frame': 2 obs. of 3 variables:
# ..$ mpg : num [1:2] 15.2 15.8
# ..$ cyl : num [1:2] 8 8
# ..$ disp: num [1:2] 276 351
# $ :'data.frame': 2 obs. of 3 variables:
# ..$ mpg : num [1:2] 14.3 15.2
# ..$ cyl : num [1:2] 8 8
# ..$ disp: num [1:2] 360 304
现在 运行 你的第五个 function/script。
您可以将 promises
与 future
结合使用:promises::promise_all
后跟 promises::then
允许在启动最后一个之前等待前一个 futures
完成一个作为后台进程。
当作业在后台 运行 时,您可以控制控制台。
library(future)
library(promises)
plan(multisession)
# Job1
fJob1<- future({
# Simulate a short duration job
Sys.sleep(3)
cat('Job1 done \n')
})
# Job2
fJob2<- future({
# Simulate a medium duration job
Sys.sleep(6)
cat('Job2 done \n')
})
# Job3
fJob3<- future({
# Simulate a long duration job
Sys.sleep(10)
cat('Job3 done \n')
})
# last Job
runLastJob <- function(res) {
cat('Last job launched \n')
# launch here script for last job
}
# Cancel last Job
cancelLastJob <- function(res) {
cat('Last job not launched \n')
}
# Wait for all jobs to be completed and launch last job
p_wait_all <- promises::promise_all(fJob1, fJob2, fJob3 )
promises::then(p_wait_all, onFulfilled = runLastJob, onRejected = cancelLastJob)
Job1 done
Job2 done
Job3 done
Last job launched
关于我需要如何 运行 我的脚本的安排是首先使用 rstudioapi::jobRunScript()
函数并行 运行 4 个 R 脚本。 运行 并行的每个脚本不会从任何环境导入任何内容,而是将创建的数据帧导出到全局环境。 我的第 5 个 R 脚本建立在由 运行 并行 的 4 个 R 脚本创建的数据帧的基础上,这第 5 个脚本也在控制台中 运行ning。如果有办法 运行 在后台 第 5 个脚本,而不是在前 4 个 R 脚本并行完成 运行 之后在控制台中,那将会好很多。我也在尝试减少整个过程的总 运行ning 时间。
虽然我能够弄清楚如何并行地 运行 前 4 个 R 脚本,但我的任务还没有完全完成,因为我找不到如何触发 运行我的第 5 个 R 脚本。希望大家能帮帮我
这对我来说有点太开放了。虽然 rstudioapi
绝对可以用于 运行ning 并行任务,但它不是很通用并且不会为您提供非常有用的输出。 parallel
Universe 在 R 中得到了很好的实现,有几个包提供了一个更简单和更好的接口来执行此操作。这里有 3 个选项,它们还允许来自不同文件的内容 'output'。
包=并行
使用并行包,我们可以非常简单地实现这一点。只需创建一个要获取的文件向量并在每个线程中执行 source
。主进程将在 运行ning 时锁定,但如果您无论如何都必须等待它们完成,这并不重要。
library(parallel)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
# full path to file that should execute
files <- c(...)
# use an lapply in parallel.
result <- parLapply(cl, files, source)
# Remember to close the cluster
stopCluster(cl)
# If anything is returned this can now be used.
作为旁注,一些包具有与 parallel
包相似的接口,后者是在 snow
包的基础上构建的,因此这是一个很好的了解基础。
包=foreach
parallel
包的替代方案是 foreach
包,它提供了类似于 for-loop
接口的东西,简化了接口,同时提供了更大的灵活性并自动导入必要的库和变量(尽管手动执行此操作更安全)。
foreach
软件包确实依赖于 parallel
和 doParallel
软件包来设置集群,但是
library(parallel)
library(doParallel)
library(foreach)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
files <- c(...)
registerDoParallel(cl)
# Run parallel using foreach
# remember %dopar% for parallel. %do% for sequential.
result <- foreach(file = files, .combine = list, .multicombine = TRUE) %dopar% {
source(file)
# Add any code before or after source.
}
# Stop cluster
stopCluster(cl)
# Do more stuff. Result holds any result returned by foreach.
虽然它确实添加了几行代码,但 .combine
、.packages
和 .export
为 R 中的并行计算提供了一个非常简单的接口。
包=未来
现在这是要使用的比较少见的软件包之一。 future
提供了一个比 parallel
和 foreach
更灵活的并行接口,允许异步并行编程。然而,实施似乎有点令人生畏,而我在下面提供的示例只是触及了可能的表面。
另外值得一提的是,虽然 future
包确实提供了 运行 代码所需的函数和包的自动导入,但经验让我意识到这仅限于任何调用的第一层深度(有时更少),因为这样的导出仍然是必要的。
虽然 foreach
依赖于 parallel
(或类似的)来启动集群,但 foreach
将使用所有可用的核心自行启动一个集群。对 plan(multiprocess)
的简单调用将启动多核会话。
library(future)
files <- c(...)
# Start multiprocess session
plan(multiprocess)
# Simple wrapper function, so we can iterate over the files variable easier
source_future <- function(file)
future(file)
results <- lapply(files, source_future)
# Do some calculations in the meantime
print('hello world, I am running while waiting for the futures to finish')
# Force waiting for the futures to finish
resolve(results)
# Extract any result from the futures
results <- values(results)
# Clean up the process (close down clusters)
plan(sequential)
# Run some more code.
乍一看这似乎很沉重,但一般机制是:
- 呼叫
plan(multiprocess)
- 使用
future
(或%<-%
,我不会深入)执行一些功能 - 如果您有更多代码要 运行,请执行其他操作,这不依赖于进程
- 使用
resolve
等待结果,它适用于列表(或环境)中的单个未来或多个未来 - 对单个期货使用
value
或对列表(或环境)中的多个期货使用values
收集结果 - 使用
plan(sequential)
清除 - 继续使用取决于您的期货结果的代码。
future
环境中的任何集群 运行ning
我相信这 3 个包为任何用户需要与之交互的多处理的每个必要元素(至少在 CPU 上)提供了接口。其他包提供替代接口,而对于异步,我只知道 future
和 promises
。总的来说,我建议大多数用户在进入异步编程时要非常小心,因为这可能会导致一整套问题,与同步并行编程相比,这些问题的发生频率较低。
我希望这可能有助于为(非常有限的)rstudioapi
接口提供一个替代方案,我相当确定该接口从未打算由用户自己用于并行编程,但更有可能用于通过接口本身执行并行构建包等任务。
我不知道这对您当前的情况有多适应,但这里有一种方法可以并行处理四件事 运行,获取它们的 return 值,然后触发第五个expression/function.
前提是 callr::r_bg
用于 运行 单个文件。这实际上 运行 是一个 function
,而不是一个文件,所以我将修改对这些文件的预期 一点点 .
我将编写一个辅助脚本,旨在模仿您的四个脚本之一。我猜你也希望能够正常获取它(运行 它直接而不是作为一个函数),所以我将生成脚本文件,以便它“知道”它是被获取还是 运行 直接(基于 python
,这类似于 python 的 if __name__ == "__main__"
技巧。)
名为 somescript.R
的辅助脚本。
somefunc <- function(seconds) {
# put the contents of a script file in this function, and have
# it return() the data you need back in the calling environment
Sys.sleep(seconds)
return(mtcars[sample(nrow(mtcars),2),1:3])
}
if (sys.nframe() == 0L) {
# if we're here, the script is being Rscript'd, not source'd
somefunc(3)
}
作为演示,如果在控制台上source
,这个只是定义函数(或者多个,如果你愿意),它不执行最后 if
块中的代码:
system.time(source("~/Whosebug/14182669/somescript.R"))
# # <--- no output, it did not return a sample from mtcars
# user system elapsed
# 0 0 0 # <--- no time passed
但是如果我在终端中 运行 使用 Rscript
,
$ time /c/R/R-4.0.2/bin/x64/Rscript somescript.R
mpg cyl disp
Merc 280C 17.8 6 167.6
Mazda RX4 Wag 21.0 6 160.0
real 0m3.394s # <--- 3 second sleep
user 0m0.000s
sys 0m0.015s
回到前提。而不是四个“脚本”,像我上面的 somescript.R
一样重写你的脚本文件。如果操作正确,它们可以 Rscript
ed 以及 source
d 以不同的意图。
我将使用这个脚本四次而不是四个脚本。这是我们想要自动化的手册 run-through:
# library(callr)
tasks <- list(
callr::r_bg(somefunc, args = list(5)),
callr::r_bg(somefunc, args = list(1)),
callr::r_bg(somefunc, args = list(10)),
callr::r_bg(somefunc, args = list(3))
)
sapply(tasks, function(tk) tk$is_alive())
# [1] TRUE FALSE TRUE FALSE
### time passes
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE TRUE FALSE
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE FALSE FALSE
tasks[[1]]$get_result()
# mpg cyl disp hp drat wt qsec vs am gear carb
# Merc 280 19.2 6 167.6 123 3.92 3.440 18.30 1 0 4 4
# Chrysler Imperial 14.7 8 440.0 230 3.23 5.345 17.42 0 0 3 4
我们可以使用
将其自动化source("somescript.R")
message(Sys.time(), " starting")
# 2020-08-28 07:45:31 starting
tasks <- list(
callr::r_bg(somefunc, args = list(5)),
callr::r_bg(somefunc, args = list(1)),
callr::r_bg(somefunc, args = list(10)),
callr::r_bg(somefunc, args = list(3))
)
# some reasonable time-between-checks
while (any(sapply(tasks, function(tk) tk$is_alive()))) {
message(Sys.time(), " still waiting")
Sys.sleep(1) # <-- over to you for a reasonable poll interval
}
# 2020-08-28 07:45:32 still waiting
# 2020-08-28 07:45:33 still waiting
# 2020-08-28 07:45:34 still waiting
# 2020-08-28 07:45:35 still waiting
# 2020-08-28 07:45:36 still waiting
# 2020-08-28 07:45:37 still waiting
# 2020-08-28 07:45:38 still waiting
# 2020-08-28 07:45:39 still waiting
# 2020-08-28 07:45:40 still waiting
# 2020-08-28 07:45:41 still waiting
message(Sys.time(), " done!")
# 2020-08-28 07:45:43 done!
results <- lapply(tasks, function(tk) tk$get_result())
str(results)
# List of 4
# $ :'data.frame': 2 obs. of 3 variables:
# ..$ mpg : num [1:2] 24.4 32.4
# ..$ cyl : num [1:2] 4 4
# ..$ disp: num [1:2] 146.7 78.7
# $ :'data.frame': 2 obs. of 3 variables:
# ..$ mpg : num [1:2] 30.4 14.3
# ..$ cyl : num [1:2] 4 8
# ..$ disp: num [1:2] 95.1 360
# $ :'data.frame': 2 obs. of 3 variables:
# ..$ mpg : num [1:2] 15.2 15.8
# ..$ cyl : num [1:2] 8 8
# ..$ disp: num [1:2] 276 351
# $ :'data.frame': 2 obs. of 3 variables:
# ..$ mpg : num [1:2] 14.3 15.2
# ..$ cyl : num [1:2] 8 8
# ..$ disp: num [1:2] 360 304
现在 运行 你的第五个 function/script。
您可以将 promises
与 future
结合使用:promises::promise_all
后跟 promises::then
允许在启动最后一个之前等待前一个 futures
完成一个作为后台进程。
当作业在后台 运行 时,您可以控制控制台。
library(future)
library(promises)
plan(multisession)
# Job1
fJob1<- future({
# Simulate a short duration job
Sys.sleep(3)
cat('Job1 done \n')
})
# Job2
fJob2<- future({
# Simulate a medium duration job
Sys.sleep(6)
cat('Job2 done \n')
})
# Job3
fJob3<- future({
# Simulate a long duration job
Sys.sleep(10)
cat('Job3 done \n')
})
# last Job
runLastJob <- function(res) {
cat('Last job launched \n')
# launch here script for last job
}
# Cancel last Job
cancelLastJob <- function(res) {
cat('Last job not launched \n')
}
# Wait for all jobs to be completed and launch last job
p_wait_all <- promises::promise_all(fJob1, fJob2, fJob3 )
promises::then(p_wait_all, onFulfilled = runLastJob, onRejected = cancelLastJob)
Job1 done
Job2 done
Job3 done
Last job launched