Future doFuture foreach 后端问题与 mutate

Future doFuture foreach backend issue with mutate

当我 运行 使用未来后端时,我的变异块就失败了。 如果我使用 snow 后端,则变异块会得到正确评估。 如果我使用 base R,它可以与 Future 一起使用。

知道为什么这行不通吗? 我应该只在后端使用 snow 吗?



#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>     filter, lag
#> The following objects are masked from 'package:base':
#>     intersect, setdiff, setequal, union
#> Loading required package: iterators
#> Loading required package: snow
tmp_tb <- tibble::tibble(Id = c(1:5),
                         Sample_color = c("green", "blue", "yellow", "orange", "grey"),
                         Sample_text = c("\n Test1", "Test2", "test 3", "test 4", "test 5"))
tmp_fun <- function(loop_n, df) {
  print(paste0(loop_n, "before withCallingHndlears\n"))
  status_tb <- tibble::tibble(Foreach_loop = as.character(),
                              For_loop = as.character(),
                              Status = as.character())
  for (i in seq_len(nrow(df))) {
        if (i == 2) {
          tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
                                          For_loop = i,
                                          Status = "Good")
          status_tb <- rbind(status_tb, tmp_status_tb)
        } else if (i == 3) {
          tb_test_df <- df %>%
            dplyr::mutate(TEST = "")
          tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
                                          For_loop = i,
                                          Status = "Good")
          status_tb <- rbind(status_tb, tmp_status_tb)
        } else if (i == 4) {
          tb_test_df <- df
          tb_test_df$TEST <- ""
          tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
                                          For_loop = i,
                                          Status = "Good")
          status_tb <- rbind(status_tb, tmp_status_tb)
        } else {
          stop("this is an error!")
      }, muffleStop = function() {
        message("'stop' muffled")
        tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
                                        For_loop = i,
                                        Status = "Failure")
        status_tb <- rbind(status_tb, tmp_status_tb)
        assign(x = "status_tb", value = status_tb, envir = parent.frame(n = 4))
    error = function(cond) {
  print(paste0(loop_n, "after withCallingHndlears\n"))

numWorkers <- 2
future::plan(future::multisession, workers = numWorkers, gc = FALSE, earlySignal = TRUE)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass") %dopar% {
  tmp_fun(loop_n = out_i, df = tmp_tb)
#> numValues: 2, numResults: 0, stopped: TRUE
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "1before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "1after withCallingHndlears\n"
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "2before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "2after withCallingHndlears\n"
#> got results for task 1
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> got results for task 2
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#>   fun(accum, result.1, result.2)
#> returning status TRUE

output_df <- bind_rows(status_ls)
#> # A tibble: 10 x 3
#>    Foreach_loop For_loop Status 
#>           <int>    <int> <chr>  
#>  1            1        1 Failure
#>  2            1        2 Good   
#>  3            1        3 Failure
#>  4            1        4 Good   
#>  5            1        5 Failure
#>  6            2        1 Failure
#>  7            2        2 Good   
#>  8            2        3 Failure
#>  9            2        4 Good   
#> 10            2        5 Failure

numWorkers <- 2
cl <- parallel::makeCluster(numWorkers)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass",
                              .export = c('%>%')) %dopar% {
                                tmp_fun(loop_n = out_i, df = tmp_tb)
#> discovered package(s): 
#> automatically exporting the following variables from the local environment:
#>   tmp_fun, tmp_tb 
#> explicitly exporting variables(s): %>%
#> explicitly exporting package(s): 
#> numValues: 2, numResults: 0, stopped: TRUE
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#>   fun(accum, result.1, result.2)
#> returning status TRUE

output_df <- bind_rows(status_ls)
#> # A tibble: 10 x 3
#>    Foreach_loop For_loop Status 
#>           <int>    <int> <chr>  
#>  1            1        1 Failure
#>  2            1        2 Good   
#>  3            1        3 Good   
#>  4            1        4 Good   
#>  5            1        5 Failure
#>  6            2        1 Failure
#>  7            2        2 Good   
#>  8            2        3 Good   
#>  9            2        4 Good   
#> 10            2        5 Failure

reprex package (v2.0.0)

于 2021-04-23 创建
问题与 withRestarts 函数的名称有关....请勿使用以 muffle 开头的名称。