R 中的并行处理 - 使用 mclapply() 与 pbmclapply() 设置种子

Parallel processing in R - setting seed with mclapply() vs. pbmclapply()

我正在 R 中并行化模拟(使用 parallel 包中的 mclapply())并希望跟踪每个函数调用的进度。所以我决定使用 pbmcapply 包中的 pbmclapply() 以便每次我 运行 我的模拟 (pbmclapply() 是专门创建的时候都有一个进度条作为 mclapply() 的包装器,因此除了进度条外它们应该具有相同的功能).

我能够使用 mclapply() 设置种子并获得可重现的结果而没有问题,但是 pbmclapply() 给我每个 运行 不同的结果,这让我感到困惑经过。我在下面包含了一个非常简单的 reprex。

例如,这是使用 mcapply():

## GIVES THE SAME RESULT EACH TIME IT IS RUN
library(parallel)
RNGkind("L'Ecuyer-CMRG")
set.seed(1)
x <- mclapply(1:100, function(i) {rnorm(1)}, mc.cores = 2)
y <- do.call(rbind, x)
z <- mean(y)
print(mean(z))

这与使用 pbmclapply():

的代码相同
## GIVES DIFFERENT RESULTS EACH TIME IT IS RUN
library(pbmcapply)
RNGkind("L'Ecuyer-CMRG")
set.seed(1)
x <- pbmclapply(1:100, function(i) {rnorm(1)}, mc.cores = 2)
y <- do.call(rbind, x)
z <- mean(y)
print(mean(z))

上面两个代码块之间的唯一区别是在第二个代码块中使用 pbmclapply() 而在第一个代码块中使用 mclapply(),但是每次我 运行 它,第二个块每次都给出不同的结果 运行 (尽管种子以相同的方式设置)。

这两个函数之间的播种程序有何不同? 如果您能提供任何关于为什么会发生这种情况的反馈,我将不胜感激。谢谢!

问题是在 pbmcapply 包内的 utils.R 文件中 运行 有以下行:

if (isTRUE(mc.set.seed))
      mc.set.stream()

如果我们将此与我们 运行 parallel 包中的 mclapply() 函数时调用的内容进行比较,我们会看到它 运行s:

if (mc.set.seed) 
        mc.reset.stream()

这会影响结果,因为重置流将允许代码 运行 来自全局设置的种子,而 运行ning 设置流使用初始值将其设置为新的随机起始值种子。我们可以在下面附加的函数中看到这一点:

mc.reset.stream <- function () 
{
    if (RNGkind()[1L] == "L'Ecuyer-CMRG") {
        if (!exists(".Random.seed", envir = .GlobalEnv, inherits = FALSE)) 
            sample.int(1L)
        # HERE! sets the seed to the global seed variable we set
        assign("LEcuyer.seed", get(".Random.seed", envir = .GlobalEnv, 
            inherits = FALSE), envir = RNGenv)
    }
}

mc.set.stream <- function () 
{
    if (RNGkind()[1L] == "L'Ecuyer-CMRG") {
        assign(".Random.seed", get("LEcuyer.seed", envir = RNGenv), 
            envir = .GlobalEnv)
    }
    else {
        if (exists(".Random.seed", envir = .GlobalEnv, inherits = FALSE)) 
            rm(".Random.seed", envir = .GlobalEnv, inherits = FALSE)
    }
}

我认为此更改可能是由于 mclapply 的问题所致,当您想在设置种子后多次调用 mclapply 函数时,它将使用相同的随机数。 (即,通过重置 r 会话,您应该以与 pbmclapply 相同的顺序获得相同的结果,所以第一次我得到 0.143,然后是 0.064,然后是 -0.015)。这通常是首选行为,因此您可以多次调用该函数。有关详细信息,请参阅 R doesn't reset the seed when "L'Ecuyer-CMRG" RNG is used?

如果将 .customized_mcparallel 函数定义中的行从 mc.set.stream() 更改为 mc.reset.stream(),则可以使用以下代码测试这两个实现之间的差异。在这里,我简化了包中的函数调用以去除进度条并仅保留计算(也删除错误检查)和设置随机种子的更改。 (另外请注意,这些功能将不再 运行 仅在 Windows 机器上 Linux 或 MacOS)。

library(pbmcapply)
RNGkind("L'Ecuyer-CMRG")
set.seed(1)
pbmclapply <- function()  {

  pkg <- asNamespace('pbmcapply')
  .cleanup <- get('.cleanup', pkg)


  progressMonitor <- .customized_mcparallel({

    mclapply(1:100, function(i) {
            rnorm(1)
        }, mc.cores = 2, mc.preschedule = TRUE, mc.set.seed = TRUE,
                       mc.cleanup = TRUE, mc.allow.recursive = TRUE)
  })

  # clean up processes on exit
  on.exit(.cleanup(progressMonitor$pid), add = T)

  # Retrieve the result
  results <- suppressWarnings(mccollect(progressMonitor$pid)[[as.character(progressMonitor$pid)]])

  return(results)
}

.customized_mcparallel <- function (expr, name, detached = FALSE){
  # loading hidden functions
  pkg <- asNamespace('parallel')
  mcfork <- get('mcfork', pkg)
  mc.advance.stream <- get('mc.advance.stream', pkg)
  mcexit <- get('mcexit', pkg)
  mcinteractive <- get('mcinteractive', pkg)
  sendMaster <- get('sendMaster', pkg)
  mc.set.stream <- get('mc.set.stream', pkg)
  mc.reset.stream <- get('mc.reset.stream', pkg)

  f <- mcfork(F)
  env <- parent.frame()
  mc.advance.stream()
  if (inherits(f, "masterProcess")) {

    mc.set.stream()
    # reset the group process id of the forked process
    mcinteractive(FALSE)

    sendMaster(try(eval(expr, env), silent = TRUE))
    mcexit(0L)
  }

  f
}

x <- pbmclapply()
y <- do.call(rbind, x)
z <- mean(y)
print(z)

对于完整的补救措施,我最好的建议是在您自己的代码中重新实现这些功能(我复制并粘贴了一些来自 pbmcapply 的功能的小修改)或者通过分叉包并替换 mc.set.seed在带有 mc.reset.seed 的 utils.R 文件中。目前我想不出更简单的解决方案,但希望这能澄清问题。