R 中的并行处理 - 使用 mclapply() 与 pbmclapply() 设置种子
Parallel processing in R - setting seed with mclapply() vs. pbmclapply()
我正在 R 中并行化模拟(使用 parallel
包中的 mclapply()
)并希望跟踪每个函数调用的进度。所以我决定使用 pbmcapply
包中的 pbmclapply()
以便每次我 运行 我的模拟 (pbmclapply()
是专门创建的时候都有一个进度条作为 mclapply()
的包装器,因此除了进度条外它们应该具有相同的功能).
我能够使用 mclapply()
设置种子并获得可重现的结果而没有问题,但是 pbmclapply()
给我每个 运行 不同的结果,这让我感到困惑经过。我在下面包含了一个非常简单的 reprex。
例如,这是使用 mcapply()
:
## GIVES THE SAME RESULT EACH TIME IT IS RUN
library(parallel)
RNGkind("L'Ecuyer-CMRG")
set.seed(1)
x <- mclapply(1:100, function(i) {rnorm(1)}, mc.cores = 2)
y <- do.call(rbind, x)
z <- mean(y)
print(mean(z))
这与使用 pbmclapply()
:
的代码相同
## GIVES DIFFERENT RESULTS EACH TIME IT IS RUN
library(pbmcapply)
RNGkind("L'Ecuyer-CMRG")
set.seed(1)
x <- pbmclapply(1:100, function(i) {rnorm(1)}, mc.cores = 2)
y <- do.call(rbind, x)
z <- mean(y)
print(mean(z))
上面两个代码块之间的唯一区别是在第二个代码块中使用 pbmclapply()
而在第一个代码块中使用 mclapply()
,但是每次我 运行 它,第二个块每次都给出不同的结果 运行 (尽管种子以相同的方式设置)。
这两个函数之间的播种程序有何不同? 如果您能提供任何关于为什么会发生这种情况的反馈,我将不胜感激。谢谢!
问题是在 pbmcapply 包内的 utils.R 文件中 运行 有以下行:
if (isTRUE(mc.set.seed))
mc.set.stream()
如果我们将此与我们 运行 parallel 包中的 mclapply()
函数时调用的内容进行比较,我们会看到它 运行s:
if (mc.set.seed)
mc.reset.stream()
这会影响结果,因为重置流将允许代码 运行 来自全局设置的种子,而 运行ning 设置流使用初始值将其设置为新的随机起始值种子。我们可以在下面附加的函数中看到这一点:
mc.reset.stream <- function ()
{
if (RNGkind()[1L] == "L'Ecuyer-CMRG") {
if (!exists(".Random.seed", envir = .GlobalEnv, inherits = FALSE))
sample.int(1L)
# HERE! sets the seed to the global seed variable we set
assign("LEcuyer.seed", get(".Random.seed", envir = .GlobalEnv,
inherits = FALSE), envir = RNGenv)
}
}
mc.set.stream <- function ()
{
if (RNGkind()[1L] == "L'Ecuyer-CMRG") {
assign(".Random.seed", get("LEcuyer.seed", envir = RNGenv),
envir = .GlobalEnv)
}
else {
if (exists(".Random.seed", envir = .GlobalEnv, inherits = FALSE))
rm(".Random.seed", envir = .GlobalEnv, inherits = FALSE)
}
}
我认为此更改可能是由于 mclapply 的问题所致,当您想在设置种子后多次调用 mclapply 函数时,它将使用相同的随机数。 (即,通过重置 r 会话,您应该以与 pbmclapply 相同的顺序获得相同的结果,所以第一次我得到 0.143,然后是 0.064,然后是 -0.015)。这通常是首选行为,因此您可以多次调用该函数。有关详细信息,请参阅 R doesn't reset the seed when "L'Ecuyer-CMRG" RNG is used?。
如果将 .customized_mcparallel 函数定义中的行从 mc.set.stream() 更改为 mc.reset.stream(),则可以使用以下代码测试这两个实现之间的差异。在这里,我简化了包中的函数调用以去除进度条并仅保留计算(也删除错误检查)和设置随机种子的更改。 (另外请注意,这些功能将不再 运行 仅在 Windows 机器上 Linux 或 MacOS)。
library(pbmcapply)
RNGkind("L'Ecuyer-CMRG")
set.seed(1)
pbmclapply <- function() {
pkg <- asNamespace('pbmcapply')
.cleanup <- get('.cleanup', pkg)
progressMonitor <- .customized_mcparallel({
mclapply(1:100, function(i) {
rnorm(1)
}, mc.cores = 2, mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.cleanup = TRUE, mc.allow.recursive = TRUE)
})
# clean up processes on exit
on.exit(.cleanup(progressMonitor$pid), add = T)
# Retrieve the result
results <- suppressWarnings(mccollect(progressMonitor$pid)[[as.character(progressMonitor$pid)]])
return(results)
}
.customized_mcparallel <- function (expr, name, detached = FALSE){
# loading hidden functions
pkg <- asNamespace('parallel')
mcfork <- get('mcfork', pkg)
mc.advance.stream <- get('mc.advance.stream', pkg)
mcexit <- get('mcexit', pkg)
mcinteractive <- get('mcinteractive', pkg)
sendMaster <- get('sendMaster', pkg)
mc.set.stream <- get('mc.set.stream', pkg)
mc.reset.stream <- get('mc.reset.stream', pkg)
f <- mcfork(F)
env <- parent.frame()
mc.advance.stream()
if (inherits(f, "masterProcess")) {
mc.set.stream()
# reset the group process id of the forked process
mcinteractive(FALSE)
sendMaster(try(eval(expr, env), silent = TRUE))
mcexit(0L)
}
f
}
x <- pbmclapply()
y <- do.call(rbind, x)
z <- mean(y)
print(z)
对于完整的补救措施,我最好的建议是在您自己的代码中重新实现这些功能(我复制并粘贴了一些来自 pbmcapply 的功能的小修改)或者通过分叉包并替换 mc.set.seed在带有 mc.reset.seed 的 utils.R 文件中。目前我想不出更简单的解决方案,但希望这能澄清问题。
我正在 R 中并行化模拟(使用 parallel
包中的 mclapply()
)并希望跟踪每个函数调用的进度。所以我决定使用 pbmcapply
包中的 pbmclapply()
以便每次我 运行 我的模拟 (pbmclapply()
是专门创建的时候都有一个进度条作为 mclapply()
的包装器,因此除了进度条外它们应该具有相同的功能).
我能够使用 mclapply()
设置种子并获得可重现的结果而没有问题,但是 pbmclapply()
给我每个 运行 不同的结果,这让我感到困惑经过。我在下面包含了一个非常简单的 reprex。
例如,这是使用 mcapply()
:
## GIVES THE SAME RESULT EACH TIME IT IS RUN
library(parallel)
RNGkind("L'Ecuyer-CMRG")
set.seed(1)
x <- mclapply(1:100, function(i) {rnorm(1)}, mc.cores = 2)
y <- do.call(rbind, x)
z <- mean(y)
print(mean(z))
这与使用 pbmclapply()
:
## GIVES DIFFERENT RESULTS EACH TIME IT IS RUN
library(pbmcapply)
RNGkind("L'Ecuyer-CMRG")
set.seed(1)
x <- pbmclapply(1:100, function(i) {rnorm(1)}, mc.cores = 2)
y <- do.call(rbind, x)
z <- mean(y)
print(mean(z))
上面两个代码块之间的唯一区别是在第二个代码块中使用 pbmclapply()
而在第一个代码块中使用 mclapply()
,但是每次我 运行 它,第二个块每次都给出不同的结果 运行 (尽管种子以相同的方式设置)。
这两个函数之间的播种程序有何不同? 如果您能提供任何关于为什么会发生这种情况的反馈,我将不胜感激。谢谢!
问题是在 pbmcapply 包内的 utils.R 文件中 运行 有以下行:
if (isTRUE(mc.set.seed))
mc.set.stream()
如果我们将此与我们 运行 parallel 包中的 mclapply()
函数时调用的内容进行比较,我们会看到它 运行s:
if (mc.set.seed)
mc.reset.stream()
这会影响结果,因为重置流将允许代码 运行 来自全局设置的种子,而 运行ning 设置流使用初始值将其设置为新的随机起始值种子。我们可以在下面附加的函数中看到这一点:
mc.reset.stream <- function ()
{
if (RNGkind()[1L] == "L'Ecuyer-CMRG") {
if (!exists(".Random.seed", envir = .GlobalEnv, inherits = FALSE))
sample.int(1L)
# HERE! sets the seed to the global seed variable we set
assign("LEcuyer.seed", get(".Random.seed", envir = .GlobalEnv,
inherits = FALSE), envir = RNGenv)
}
}
mc.set.stream <- function ()
{
if (RNGkind()[1L] == "L'Ecuyer-CMRG") {
assign(".Random.seed", get("LEcuyer.seed", envir = RNGenv),
envir = .GlobalEnv)
}
else {
if (exists(".Random.seed", envir = .GlobalEnv, inherits = FALSE))
rm(".Random.seed", envir = .GlobalEnv, inherits = FALSE)
}
}
我认为此更改可能是由于 mclapply 的问题所致,当您想在设置种子后多次调用 mclapply 函数时,它将使用相同的随机数。 (即,通过重置 r 会话,您应该以与 pbmclapply 相同的顺序获得相同的结果,所以第一次我得到 0.143,然后是 0.064,然后是 -0.015)。这通常是首选行为,因此您可以多次调用该函数。有关详细信息,请参阅 R doesn't reset the seed when "L'Ecuyer-CMRG" RNG is used?。
如果将 .customized_mcparallel 函数定义中的行从 mc.set.stream() 更改为 mc.reset.stream(),则可以使用以下代码测试这两个实现之间的差异。在这里,我简化了包中的函数调用以去除进度条并仅保留计算(也删除错误检查)和设置随机种子的更改。 (另外请注意,这些功能将不再 运行 仅在 Windows 机器上 Linux 或 MacOS)。
library(pbmcapply)
RNGkind("L'Ecuyer-CMRG")
set.seed(1)
pbmclapply <- function() {
pkg <- asNamespace('pbmcapply')
.cleanup <- get('.cleanup', pkg)
progressMonitor <- .customized_mcparallel({
mclapply(1:100, function(i) {
rnorm(1)
}, mc.cores = 2, mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.cleanup = TRUE, mc.allow.recursive = TRUE)
})
# clean up processes on exit
on.exit(.cleanup(progressMonitor$pid), add = T)
# Retrieve the result
results <- suppressWarnings(mccollect(progressMonitor$pid)[[as.character(progressMonitor$pid)]])
return(results)
}
.customized_mcparallel <- function (expr, name, detached = FALSE){
# loading hidden functions
pkg <- asNamespace('parallel')
mcfork <- get('mcfork', pkg)
mc.advance.stream <- get('mc.advance.stream', pkg)
mcexit <- get('mcexit', pkg)
mcinteractive <- get('mcinteractive', pkg)
sendMaster <- get('sendMaster', pkg)
mc.set.stream <- get('mc.set.stream', pkg)
mc.reset.stream <- get('mc.reset.stream', pkg)
f <- mcfork(F)
env <- parent.frame()
mc.advance.stream()
if (inherits(f, "masterProcess")) {
mc.set.stream()
# reset the group process id of the forked process
mcinteractive(FALSE)
sendMaster(try(eval(expr, env), silent = TRUE))
mcexit(0L)
}
f
}
x <- pbmclapply()
y <- do.call(rbind, x)
z <- mean(y)
print(z)
对于完整的补救措施,我最好的建议是在您自己的代码中重新实现这些功能(我复制并粘贴了一些来自 pbmcapply 的功能的小修改)或者通过分叉包并替换 mc.set.seed在带有 mc.reset.seed 的 utils.R 文件中。目前我想不出更简单的解决方案,但希望这能澄清问题。