R并行中止所有mclapply操作

R parallel abort all mclapply operations

是否可以要求 parallel::mclapply() 在其任何一个过程中遇到错误(例如 stop())时尽快放弃所有进一步处理?

标准 mclapply() 调用无法在一个进程出错时终止集群所有进程的评估。这样做的原因是进程在完成之前不会相互通信。

使用 R 包 future 可以实现这样的行为。思路是

  1. 创建期货并并行评估它们
  2. 每 2 秒检查一次功能是否已解决为错误
  3. 如果检测到错误,则终止集群的所有进程

这是如何工作的草图:

library(future)
library(parallel)
library(tools)

parallelLapply <- function(x, fun, checkInterval=2, nProcess=2){ 

    ## setup cluster and get process IDs of process in cluster
    cl <- makeCluster(spec=nProcess)
    pids <- unlist(parLapply(cl=cl, X=1:nProcess, function(x) Sys.getpid()))
    plan(cluster, workers=cl)

    ## create futures and start their evaluation 
    fList <- lapply(1:2, function(x) futureCall(function(x) try(fun(x), silent=TRUE), list(x=x)))

    ## check every 2 second whether an error occurred or whether all are resolved
    while(TRUE){
        Sys.sleep(checkInterval)

        ## check for errors
        errorStatus <- unlist(lapply(fList, function(x)
            resolved(x) && class(value(x))=="try-error"))
        if(any(unlist(errorStatus))){
            lapply(pids, pskill)
            results <- NULL
            cat("an error occurred in one future: all process of the cluster were killed.\n")
            break
        }

        ## check if all resolved without error
        allResolved <- all(unlist(lapply(fList, resolved)))
        if(allResolved){
            results <- lapply(fList, value)
            cat("all futures are resolved sucessfully.\n")
            break
        }
    }
    results
}

## test 1: early termination because x=1 results in an error. 
f1 <- function(x){
    if(x==1) stop()
    Sys.sleep(15)
    x
}
parallelLapply(x=1:5, fun=f1)
# an error occurred in one future: all process of the cluster were killed.
# NULL


## test 2: no error 
f2 <- function(x){
    Sys.sleep(15)
    x
}
parallelLapply(x=1:5, fun=f2)
## all futures are resolved sucessfully.
## [[1]]
## [1] 1
## 
## [[2]]
## [1] 2

注:

  • 如果传递给 fun 的函数取决于其他参数,则需要进行额外调整。
  • 在 Linux 上,为方便起见,可以使用 makeForkCluster() 而不是 makeCluster()。那么用法更接近于mclapply().

以下是丑陋的,但可行。它使用文件系统作为全局共享变量。

options( mc.cores=2 )

if (!exists("touchFile"))
    touchFile <- function(filename) { system(paste0("touch ", filename)); }

tfnm <- paste0("mytemporary",as.numeric(Sys.time()))

mfun <- function( i ) {
    if (file.exists( tfnm )) stop("done due to process ", i)
    message("Mfun(", i,")")
    if ( i == 3 ) { message("creating ", tfnm); touchFile(tfnm); stop("goodbye"); }
    Sys.sleep( i%%3 )
}

v <- mclapply( 1:10, mfun )
if (file.exists(tfnm)) file.remove(tfnm)

mclapply自己实现会更好

这是 ivo Welch 建议的更简洁版本。请注意,此 不会在发生错误时停止 运行 进程 ,而是 阻止开始新的评估 FUN.

library(parallel)
mcLapply <- function(X, FUN, ..., mc.preschedule=TRUE,
                     mc.set.seed=TRUE, mc.silent=FALSE,
                     mc.cores=getOption("mc.cores", 2L), 
                     mc.cleanup=TRUE, mc.allow.recursive=TRUE,
                     affinity.list=NULL){
    tmpFileName <- tempfile()
    fn <- function(X){
        if(file.exists(tmpFileName))
            return(NA)
        o <- try(do.call("FUN", c(X, list(...))), silent=TRUE)
        if(class(o)=="try-error"){
            file.create(tmpFileName)
        }
        o
    }
    ret <- mclapply(X=X, FUN=fn, mc.preschedule=mc.preschedule,
                    mc.set.seed=mc.set.seed, mc.silent=mc.silent,
                    mc.cores=mc.cores, mc.cleanup=mc.cleanup,
                    mc.allow.recursive=mc.allow.recursive,
                    affinity.list=affinity.list)
    if(exists(tmpFileName))
        file.remove(tmpFileName)
    ret
}

## test 1: early termination because x=1 results in an error. 
f1 <- function(x){
    if(x==1) stop()
    Sys.sleep(1)
    x
}
mcLapply(X=1:3, FUN=f1)
## [[1]]
## [1] "Error in FUN(1L) : \n"
## attr(,"class")
## [1] "try-error"
## attr(,"condition")
## <simpleError in FUN(1L): >
## 
## [[2]]
## [1] NA
## 
## [[3]]
## [1] NA

## test 2: no error 
f2 <- function(x, a){
    Sys.sleep(1)
    x+a
}
mcLapply(X=1:2, FUN=f2, a=10)
## [[1]]
## [1] 11
## 
## [[2]]
## [1] 12

还有一种做法: 思路是在#!!所指的三个地方修改parallel::mclapply()。新参数 stop.on.error 可用于指定发生错误时是否应停止执行。

library(parallel)
Mclapply <- function (X, FUN, ..., mc.preschedule = TRUE,
                      mc.set.seed = TRUE, mc.silent = FALSE,
                      mc.cores = getOption("mc.cores", 2L), 
                      mc.cleanup = TRUE, mc.allow.recursive = TRUE,
                      affinity.list = NULL, stop.on.error=FALSE) 
{
    stop.on.error <- stop.on.error[1]        #!!
    stopifnot(is.logical(stop.on.error))     #!!
    cores <- as.integer(mc.cores)
    if ((is.na(cores) || cores < 1L) && is.null(affinity.list)) 
        stop("'mc.cores' must be >= 1")
    parallel:::.check_ncores(cores)
    if (parallel:::isChild() && !isTRUE(mc.allow.recursive)) 
        return(lapply(X = X, FUN = FUN, ...))
    if (!is.vector(X) || is.object(X)) 
        X <- as.list(X)
    if (!is.null(affinity.list) && length(affinity.list) < length(X)) 
        stop("affinity.list and X must have the same length")
    if (mc.set.seed) 
        mc.reset.stream()
    if (length(X) < 2) {
        old.aff <- mcaffinity()
        mcaffinity(affinity.list[[1]])
        res <- lapply(X = X, FUN = FUN, ...)
        mcaffinity(old.aff)
        return(res)
    }
    if (length(X) < cores) 
        cores <- length(X)
    if (cores < 2L && is.null(affinity.list)) 
        return(lapply(X = X, FUN = FUN, ...))
    jobs <- list()
    parallel:::prepareCleanup()
    on.exit(parallel:::cleanup(mc.cleanup))
    if (!mc.preschedule) {
        FUN <- match.fun(FUN)
        if (length(X) <= cores && is.null(affinity.list)) {
            jobs <- lapply(seq_along(X), function(i) mcparallel(FUN(X[[i]], 
                ...), name = names(X)[i], mc.set.seed = mc.set.seed, 
                silent = mc.silent))
            res <- mccollect(jobs)
            if (length(res) == length(X)) 
                names(res) <- names(X)
            has.errors <- sum(sapply(res, inherits, "try-error"))
        }
        else {
            sx <- seq_along(X)
            res <- vector("list", length(sx))
            names(res) <- names(X)
            fin <- rep(FALSE, length(X))
            if (!is.null(affinity.list)) {
                cores <- max(unlist(x = affinity.list, recursive = TRUE))
                d0 <- logical(cores)
                cpu.map <- lapply(sx, function(i) {
                  data <- d0
                  data[as.vector(affinity.list[[i]])] <- TRUE
                  data
                })
                ava <- do.call(rbind, cpu.map)
            }
            else {
                ava <- matrix(TRUE, nrow = length(X), ncol = cores)
            }
            jobid <- integer(cores)
            for (i in 1:cores) {
                jobid[i] <- match(TRUE, ava[, i])
                ava[jobid[i], ] <- FALSE
            }
            if (anyNA(jobid)) {
                unused <- which(is.na(jobid))
                jobid <- jobid[-unused]
                ava <- ava[, -unused, drop = FALSE]
            }
            jobs <- lapply(jobid, function(i) mcparallel(FUN(X[[i]], 
                ...), mc.set.seed = mc.set.seed, silent = mc.silent, 
                mc.affinity = affinity.list[[i]]))
            jobsp <- parallel:::processID(jobs)
            has.errors <- 0L
            delivered.result <- 0L
            while (!all(fin)) {
                s <- parallel:::selectChildren(jobs[!is.na(jobsp)], -1)
                if (is.null(s)) 
                  break
                if (is.integer(s)) 
                  for (ch in s) {
                    ji <- match(TRUE, jobsp == ch)
                    ci <- jobid[ji]
                    r <- parallel:::readChild(ch)
                    if (is.raw(r)) {
                      child.res <- unserialize(r)
                      if (inherits(child.res, "try-error")){
                          if(stop.on.error)                     #!!
                              stop("error in process X = ", ci, "\n", attr(child.res, "condition")$message) #!!
                          has.errors <- has.errors + 1L
                      }
                      if (!is.null(child.res)) 
                        res[[ci]] <- child.res
                      delivered.result <- delivered.result + 
                        1L
                    }
                    else {
                      fin[ci] <- TRUE
                      jobsp[ji] <- jobid[ji] <- NA
                      if (any(ava)) {
                        nexti <- which.max(ava[, ji])
                        if (!is.na(nexti)) {
                          jobid[ji] <- nexti
                          jobs[[ji]] <- mcparallel(FUN(X[[nexti]], 
                            ...), mc.set.seed = mc.set.seed, 
                            silent = mc.silent, mc.affinity = affinity.list[[nexti]])
                          jobsp[ji] <- parallel:::processID(jobs[[ji]])
                          ava[nexti, ] <- FALSE
                        }
                      }
                    }
                  }
            }
            nores <- length(X) - delivered.result
            if (nores > 0) 
                warning(sprintf(ngettext(nores, "%d parallel function call did not deliver a result", 
                  "%d parallel function calls did not deliver results"), 
                  nores), domain = NA)
        }
        if (has.errors) 
            warning(gettextf("%d function calls resulted in an error", 
                has.errors), domain = NA)
        return(res)
    }
    if (!is.null(affinity.list)) 
        warning("'mc.preschedule' must be false if 'affinity.list' is used")
    sindex <- lapply(seq_len(cores), function(i) seq(i, length(X), 
        by = cores))
    schedule <- lapply(seq_len(cores), function(i) X[seq(i, length(X), 
        by = cores)])
    ch <- list()
    res <- vector("list", length(X))
    names(res) <- names(X)
    cp <- rep(0L, cores)
    fin <- rep(FALSE, cores)
    dr <- rep(FALSE, cores)
    inner.do <- function(core) {
        S <- schedule[[core]]
        f <- parallel:::mcfork()
        if (isTRUE(mc.set.seed)) 
            parallel:::mc.advance.stream()
        if (inherits(f, "masterProcess")) {
            on.exit(mcexit(1L, structure("fatal error in wrapper code", 
                class = "try-error")))
            if (isTRUE(mc.set.seed)) 
                parallel:::mc.set.stream()
            if (isTRUE(mc.silent)) 
                closeStdout(TRUE)
            parallel:::sendMaster(try(lapply(X = S, FUN = FUN, ...), silent = TRUE))
            parallel:::mcexit(0L)
        }
        jobs[[core]] <<- ch[[core]] <<- f
        cp[core] <<- parallel:::processID(f)
        NULL
    }
    job.res <- lapply(seq_len(cores), inner.do)
    ac <- cp[cp > 0]
    has.errors <- integer(0)
    while (!all(fin)) {
        s <- parallel:::selectChildren(ac[!fin], -1)
        if (is.null(s)) 
            break
        if (is.integer(s)) 
            for (ch in s) {
                a <- parallel:::readChild(ch)
                if (is.integer(a)) {
                  core <- which(cp == a)
                  fin[core] <- TRUE
                }
                else if (is.raw(a)) {
                  core <- which(cp == attr(a, "pid"))
                  job.res[[core]] <- ijr <- unserialize(a)
                  if (inherits(ijr, "try-error")){ 
                    has.errors <- c(has.errors, core)
                    if(stop.on.error)  #!!
                        stop("error in one of X = ", paste(schedule[[core]], collapse=", "), "\n", attr(ijr, "condition")$message) #!!
                  }
                  dr[core] <- TRUE
                }
                else if (is.null(a)) {
                  core <- which(cp == ch)
                  fin[core] <- TRUE
                }
            }
    }
    for (i in seq_len(cores)) {
        this <- job.res[[i]]
        if (inherits(this, "try-error")) {
            for (j in sindex[[i]]) res[[j]] <- this
        }
        else if (!is.null(this)) 
            res[sindex[[i]]] <- this
    }
    nores <- cores - sum(dr)
    if (nores > 0) 
        warning(sprintf(ngettext(nores, "scheduled core %s did not deliver a result, all values of the job will be affected", 
            "scheduled cores %s did not deliver results, all values of the jobs will be affected"), 
            paste(which(dr == FALSE), collapse = ", ")), domain = NA)
    if (length(has.errors)) {
        if (length(has.errors) == cores) 
            warning("all scheduled cores encountered errors in user code")
        else warning(sprintf(ngettext(has.errors, "scheduled core %s encountered error in user code, all values of the job will be affected", 
            "scheduled cores %s encountered errors in user code, all values of the jobs will be affected"), 
            paste(has.errors, collapse = ", ")), domain = NA)
    }
    res
}

测试:

f <- function(x, errorAt=1, sleep=2){
    if(x==errorAt) stop("-->> test error <<--")
    Sys.sleep(sleep)
    x
}

options(mc.cores=2)              
Mclapply(X=1:4, FUN=f, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, stop.on.error = TRUE) : 
##   error in one of X = 1, 3
## -->> test error <<--

Mclapply(X=1:4, FUN=f, errorAt=3, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, errorAt = 3, stop.on.error = TRUE) : 
##   error in one of X = 1, 3
## -->> test error <<--

Mclapply(X=1:4, FUN=f, errorAt=Inf, stop.on.error=TRUE)
## [[1]]
## [1] 1
## 
## [[2]]
## [1] 2
##
## [[3]]
## [1] 3
## 
## [[4]]
## [1] 4

Mclapply(X=1:4, FUN=f, mc.preschedule=FALSE, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, mc.preschedule = FALSE, stop.on.error = TRUE) : 
##   error in process X = 1
## -->> test error <<--

Mclapply(X=1:4, FUN=f, errorAt=3, mc.preschedule=FALSE, stop.on.error=TRUE)
## Error in Mclapply(X = 1:4, FUN = f, errorAt = 3, mc.preschedule = FALSE,  : 
##   error in process X = 3
## -->> test error <<--

Mclapply(X=1:4, FUN=f, errorAt=Inf, mc.preschedule=FALSE, stop.on.error=TRUE)
## [[1]]
## [1] 1
## 
## [[2]]
## [1] 2
##
## [[3]]
## [1] 3
## 
## [[4]]
## [1] 4

此方法使用包 parallel 的许多内部函数(例如,parallel:::isChild())。它适用于 R 版本 3.6.0。