Future doFuture foreach 后端问题与 mutate

Future doFuture foreach backend issue with mutate

当我 运行 使用未来后端时,我的变异块就失败了。 如果我使用 snow 后端,则变异块会得到正确评估。 如果我使用 base R,它可以与 Future 一起使用。

知道为什么这行不通吗? 我应该只在后端使用 snow 吗?

我包括我的会话信息

请参阅下面的简化示例:

library(magrittr)
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(tibble)
library(foreach)
library(future)
library(stringr)
library(doSNOW)
#> Loading required package: iterators
#> Loading required package: snow
tmp_tb <- tibble::tibble(Id = c(1:5),
                         Sample_color = c("green", "blue", "yellow", "orange", "grey"),
                         Sample_text = c("\n Test1", "Test2", "test 3", "test 4", "test 5"))
tmp_fun <- function(loop_n, df) {
  print(paste0(loop_n, "before withCallingHndlears\n"))
  status_tb <- tibble::tibble(Foreach_loop = as.character(),
                              For_loop = as.character(),
                              Status = as.character())
  for (i in seq_len(nrow(df))) {
    withCallingHandlers({
      withRestarts({
        if (i == 2) {
          tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
                                          For_loop = i,
                                          Status = "Good")
          status_tb <- rbind(status_tb, tmp_status_tb)
        } else if (i == 3) {
          tb_test_df <- df %>%
            dplyr::mutate(TEST = "")
          tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
                                          For_loop = i,
                                          Status = "Good")
          status_tb <- rbind(status_tb, tmp_status_tb)
        } else if (i == 4) {
          tb_test_df <- df
          tb_test_df$TEST <- ""
          tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
                                          For_loop = i,
                                          Status = "Good")
          status_tb <- rbind(status_tb, tmp_status_tb)
          
        } else {
          stop("this is an error!")
        }
        
      }, muffleStop = function() {
        message("'stop' muffled")
        tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
                                        For_loop = i,
                                        Status = "Failure")
        status_tb <- rbind(status_tb, tmp_status_tb)
        assign(x = "status_tb", value = status_tb, envir = parent.frame(n = 4))
      })
    },
    error = function(cond) {
      print(cond$message)
      invokeRestart("muffleStop")
    }
    )
  }
  print(paste0(loop_n, "after withCallingHndlears\n"))
  return(status_tb)
}

doFuture::registerDoFuture()
numWorkers <- 2
future::plan(future::multisession, workers = numWorkers, gc = FALSE, earlySignal = TRUE)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass") %dopar% {
  tmp_fun(loop_n = out_i, df = tmp_tb)
}
#> numValues: 2, numResults: 0, stopped: TRUE
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "1before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "1after withCallingHndlears\n"
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "2before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "2after withCallingHndlears\n"
#> got results for task 1
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> got results for task 2
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#>   fun(accum, result.1, result.2)
#> returning status TRUE

future::plan("default")
output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#>    Foreach_loop For_loop Status 
#>           <int>    <int> <chr>  
#>  1            1        1 Failure
#>  2            1        2 Good   
#>  3            1        3 Failure
#>  4            1        4 Good   
#>  5            1        5 Failure
#>  6            2        1 Failure
#>  7            2        2 Good   
#>  8            2        3 Failure
#>  9            2        4 Good   
#> 10            2        5 Failure

numWorkers <- 2
cl <- parallel::makeCluster(numWorkers)
doSNOW::registerDoSNOW(cl)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass",
                              .export = c('%>%')) %dopar% {
                                tmp_fun(loop_n = out_i, df = tmp_tb)
                              }
#> discovered package(s): 
#> automatically exporting the following variables from the local environment:
#>   tmp_fun, tmp_tb 
#> explicitly exporting variables(s): %>%
#> explicitly exporting package(s): 
#> numValues: 2, numResults: 0, stopped: TRUE
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#>   fun(accum, result.1, result.2)
#> returning status TRUE
parallel::stopCluster(cl)

output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#>    Foreach_loop For_loop Status 
#>           <int>    <int> <chr>  
#>  1            1        1 Failure
#>  2            1        2 Good   
#>  3            1        3 Good   
#>  4            1        4 Good   
#>  5            1        5 Failure
#>  6            2        1 Failure
#>  7            2        2 Good   
#>  8            2        3 Good   
#>  9            2        4 Good   
#> 10            2        5 Failure

reprex package (v2.0.0)

于 2021-04-23 创建
sessionInfo()
#> R version 4.0.0 (2020-04-24)
#> Platform: x86_64-pc-linux-gnu (64-bit)
#> Running under: Red Hat Enterprise Linux
#> 
#> Matrix products: default
#> BLAS:   /opt/R/R_4.0.0/lib64/R/lib/libRblas.so
#> LAPACK: /opt/R/R_4.0.0/lib64/R/lib/libRlapack.so
#> 
#> locale:
#>  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
#>  [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
#>  [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
#>  [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
#>  [9] LC_ADDRESS=C               LC_TELEPHONE=C            
#> [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       
#> 
#> attached base packages:
#> [1] stats     graphics  grDevices utils     datasets  methods   base     
#> 
#> other attached packages:
#> [1] doSNOW_1.0.19    snow_0.4-3       iterators_1.0.13 stringr_1.4.0   
#> [5] future_1.21.0    foreach_1.5.1    tibble_3.1.1     dplyr_1.0.5     
#> [9] magrittr_2.0.1  
#> 
#> loaded via a namespace (and not attached):
#>  [1] pillar_1.6.0      compiler_4.0.0    highr_0.9         tools_4.0.0      
#>  [5] digest_0.6.27     evaluate_0.14     lifecycle_1.0.0   pkgconfig_2.0.3  
#>  [9] rlang_0.4.10      reprex_2.0.0      cli_2.4.0         DBI_1.1.1        
#> [13] rstudioapi_0.13   parallel_4.0.0    yaml_2.2.1        xfun_0.22        
#> [17] withr_2.4.2       knitr_1.32        generics_0.1.0    fs_1.5.0         
#> [21] vctrs_0.3.7       globals_0.14.0    tidyselect_1.1.0  doFuture_0.12.0  
#> [25] glue_1.4.2        listenv_0.8.0     R6_2.5.0          parallelly_1.24.0
#> [29] fansi_0.4.2       rmarkdown_2.7     purrr_0.3.4       codetools_0.2-18 
#> [33] ellipsis_0.3.1    htmltools_0.5.1.1 assertthat_0.2.1  utf8_1.2.1       
#> [37] stringi_1.5.3     crayon_1.4.1

问题与 withRestarts 函数的名称有关....请勿使用以 muffle 开头的名称。