Future doFuture foreach 后端问题与 mutate
Future doFuture foreach backend issue with mutate
当我 运行 使用未来后端时,我的变异块就失败了。
如果我使用 snow 后端,则变异块会得到正确评估。
如果我使用 base R,它可以与 Future 一起使用。
知道为什么这行不通吗?
我应该只在后端使用 snow 吗?
我包括我的会话信息
请参阅下面的简化示例:
library(magrittr)
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(tibble)
library(foreach)
library(future)
library(stringr)
library(doSNOW)
#> Loading required package: iterators
#> Loading required package: snow
tmp_tb <- tibble::tibble(Id = c(1:5),
Sample_color = c("green", "blue", "yellow", "orange", "grey"),
Sample_text = c("\n Test1", "Test2", "test 3", "test 4", "test 5"))
tmp_fun <- function(loop_n, df) {
print(paste0(loop_n, "before withCallingHndlears\n"))
status_tb <- tibble::tibble(Foreach_loop = as.character(),
For_loop = as.character(),
Status = as.character())
for (i in seq_len(nrow(df))) {
withCallingHandlers({
withRestarts({
if (i == 2) {
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else if (i == 3) {
tb_test_df <- df %>%
dplyr::mutate(TEST = "")
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else if (i == 4) {
tb_test_df <- df
tb_test_df$TEST <- ""
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else {
stop("this is an error!")
}
}, muffleStop = function() {
message("'stop' muffled")
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Failure")
status_tb <- rbind(status_tb, tmp_status_tb)
assign(x = "status_tb", value = status_tb, envir = parent.frame(n = 4))
})
},
error = function(cond) {
print(cond$message)
invokeRestart("muffleStop")
}
)
}
print(paste0(loop_n, "after withCallingHndlears\n"))
return(status_tb)
}
doFuture::registerDoFuture()
numWorkers <- 2
future::plan(future::multisession, workers = numWorkers, gc = FALSE, earlySignal = TRUE)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass") %dopar% {
tmp_fun(loop_n = out_i, df = tmp_tb)
}
#> numValues: 2, numResults: 0, stopped: TRUE
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "1before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "1after withCallingHndlears\n"
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "2before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "2after withCallingHndlears\n"
#> got results for task 1
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> got results for task 2
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#> fun(accum, result.1, result.2)
#> returning status TRUE
future::plan("default")
output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#> Foreach_loop For_loop Status
#> <int> <int> <chr>
#> 1 1 1 Failure
#> 2 1 2 Good
#> 3 1 3 Failure
#> 4 1 4 Good
#> 5 1 5 Failure
#> 6 2 1 Failure
#> 7 2 2 Good
#> 8 2 3 Failure
#> 9 2 4 Good
#> 10 2 5 Failure
numWorkers <- 2
cl <- parallel::makeCluster(numWorkers)
doSNOW::registerDoSNOW(cl)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass",
.export = c('%>%')) %dopar% {
tmp_fun(loop_n = out_i, df = tmp_tb)
}
#> discovered package(s):
#> automatically exporting the following variables from the local environment:
#> tmp_fun, tmp_tb
#> explicitly exporting variables(s): %>%
#> explicitly exporting package(s):
#> numValues: 2, numResults: 0, stopped: TRUE
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#> fun(accum, result.1, result.2)
#> returning status TRUE
parallel::stopCluster(cl)
output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#> Foreach_loop For_loop Status
#> <int> <int> <chr>
#> 1 1 1 Failure
#> 2 1 2 Good
#> 3 1 3 Good
#> 4 1 4 Good
#> 5 1 5 Failure
#> 6 2 1 Failure
#> 7 2 2 Good
#> 8 2 3 Good
#> 9 2 4 Good
#> 10 2 5 Failure
由 reprex package (v2.0.0)
于 2021-04-23 创建
sessionInfo()
#> R version 4.0.0 (2020-04-24)
#> Platform: x86_64-pc-linux-gnu (64-bit)
#> Running under: Red Hat Enterprise Linux
#>
#> Matrix products: default
#> BLAS: /opt/R/R_4.0.0/lib64/R/lib/libRblas.so
#> LAPACK: /opt/R/R_4.0.0/lib64/R/lib/libRlapack.so
#>
#> locale:
#> [1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C
#> [3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
#> [5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8
#> [7] LC_PAPER=en_US.UTF-8 LC_NAME=C
#> [9] LC_ADDRESS=C LC_TELEPHONE=C
#> [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
#>
#> attached base packages:
#> [1] stats graphics grDevices utils datasets methods base
#>
#> other attached packages:
#> [1] doSNOW_1.0.19 snow_0.4-3 iterators_1.0.13 stringr_1.4.0
#> [5] future_1.21.0 foreach_1.5.1 tibble_3.1.1 dplyr_1.0.5
#> [9] magrittr_2.0.1
#>
#> loaded via a namespace (and not attached):
#> [1] pillar_1.6.0 compiler_4.0.0 highr_0.9 tools_4.0.0
#> [5] digest_0.6.27 evaluate_0.14 lifecycle_1.0.0 pkgconfig_2.0.3
#> [9] rlang_0.4.10 reprex_2.0.0 cli_2.4.0 DBI_1.1.1
#> [13] rstudioapi_0.13 parallel_4.0.0 yaml_2.2.1 xfun_0.22
#> [17] withr_2.4.2 knitr_1.32 generics_0.1.0 fs_1.5.0
#> [21] vctrs_0.3.7 globals_0.14.0 tidyselect_1.1.0 doFuture_0.12.0
#> [25] glue_1.4.2 listenv_0.8.0 R6_2.5.0 parallelly_1.24.0
#> [29] fansi_0.4.2 rmarkdown_2.7 purrr_0.3.4 codetools_0.2-18
#> [33] ellipsis_0.3.1 htmltools_0.5.1.1 assertthat_0.2.1 utf8_1.2.1
#> [37] stringi_1.5.3 crayon_1.4.1
问题与 withRestarts 函数的名称有关....请勿使用以 muffle 开头的名称。
当我 运行 使用未来后端时,我的变异块就失败了。 如果我使用 snow 后端,则变异块会得到正确评估。 如果我使用 base R,它可以与 Future 一起使用。
知道为什么这行不通吗? 我应该只在后端使用 snow 吗?
我包括我的会话信息
请参阅下面的简化示例:
library(magrittr)
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(tibble)
library(foreach)
library(future)
library(stringr)
library(doSNOW)
#> Loading required package: iterators
#> Loading required package: snow
tmp_tb <- tibble::tibble(Id = c(1:5),
Sample_color = c("green", "blue", "yellow", "orange", "grey"),
Sample_text = c("\n Test1", "Test2", "test 3", "test 4", "test 5"))
tmp_fun <- function(loop_n, df) {
print(paste0(loop_n, "before withCallingHndlears\n"))
status_tb <- tibble::tibble(Foreach_loop = as.character(),
For_loop = as.character(),
Status = as.character())
for (i in seq_len(nrow(df))) {
withCallingHandlers({
withRestarts({
if (i == 2) {
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else if (i == 3) {
tb_test_df <- df %>%
dplyr::mutate(TEST = "")
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else if (i == 4) {
tb_test_df <- df
tb_test_df$TEST <- ""
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else {
stop("this is an error!")
}
}, muffleStop = function() {
message("'stop' muffled")
tmp_status_tb <- tibble::tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Failure")
status_tb <- rbind(status_tb, tmp_status_tb)
assign(x = "status_tb", value = status_tb, envir = parent.frame(n = 4))
})
},
error = function(cond) {
print(cond$message)
invokeRestart("muffleStop")
}
)
}
print(paste0(loop_n, "after withCallingHndlears\n"))
return(status_tb)
}
doFuture::registerDoFuture()
numWorkers <- 2
future::plan(future::multisession, workers = numWorkers, gc = FALSE, earlySignal = TRUE)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass") %dopar% {
tmp_fun(loop_n = out_i, df = tmp_tb)
}
#> numValues: 2, numResults: 0, stopped: TRUE
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "1before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "1after withCallingHndlears\n"
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "2before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "2after withCallingHndlears\n"
#> got results for task 1
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> got results for task 2
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#> fun(accum, result.1, result.2)
#> returning status TRUE
future::plan("default")
output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#> Foreach_loop For_loop Status
#> <int> <int> <chr>
#> 1 1 1 Failure
#> 2 1 2 Good
#> 3 1 3 Failure
#> 4 1 4 Good
#> 5 1 5 Failure
#> 6 2 1 Failure
#> 7 2 2 Good
#> 8 2 3 Failure
#> 9 2 4 Good
#> 10 2 5 Failure
numWorkers <- 2
cl <- parallel::makeCluster(numWorkers)
doSNOW::registerDoSNOW(cl)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = TRUE, .errorhandling = "pass",
.export = c('%>%')) %dopar% {
tmp_fun(loop_n = out_i, df = tmp_tb)
}
#> discovered package(s):
#> automatically exporting the following variables from the local environment:
#> tmp_fun, tmp_tb
#> explicitly exporting variables(s): %>%
#> explicitly exporting package(s):
#> numValues: 2, numResults: 0, stopped: TRUE
#> numValues: 2, numResults: 1, stopped: TRUE
#> returning status FALSE
#> numValues: 2, numResults: 2, stopped: TRUE
#> calling combine function
#> evaluating call object to combine results:
#> fun(accum, result.1, result.2)
#> returning status TRUE
parallel::stopCluster(cl)
output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#> Foreach_loop For_loop Status
#> <int> <int> <chr>
#> 1 1 1 Failure
#> 2 1 2 Good
#> 3 1 3 Good
#> 4 1 4 Good
#> 5 1 5 Failure
#> 6 2 1 Failure
#> 7 2 2 Good
#> 8 2 3 Good
#> 9 2 4 Good
#> 10 2 5 Failure
由 reprex package (v2.0.0)
于 2021-04-23 创建sessionInfo()
#> R version 4.0.0 (2020-04-24)
#> Platform: x86_64-pc-linux-gnu (64-bit)
#> Running under: Red Hat Enterprise Linux
#>
#> Matrix products: default
#> BLAS: /opt/R/R_4.0.0/lib64/R/lib/libRblas.so
#> LAPACK: /opt/R/R_4.0.0/lib64/R/lib/libRlapack.so
#>
#> locale:
#> [1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C
#> [3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
#> [5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8
#> [7] LC_PAPER=en_US.UTF-8 LC_NAME=C
#> [9] LC_ADDRESS=C LC_TELEPHONE=C
#> [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
#>
#> attached base packages:
#> [1] stats graphics grDevices utils datasets methods base
#>
#> other attached packages:
#> [1] doSNOW_1.0.19 snow_0.4-3 iterators_1.0.13 stringr_1.4.0
#> [5] future_1.21.0 foreach_1.5.1 tibble_3.1.1 dplyr_1.0.5
#> [9] magrittr_2.0.1
#>
#> loaded via a namespace (and not attached):
#> [1] pillar_1.6.0 compiler_4.0.0 highr_0.9 tools_4.0.0
#> [5] digest_0.6.27 evaluate_0.14 lifecycle_1.0.0 pkgconfig_2.0.3
#> [9] rlang_0.4.10 reprex_2.0.0 cli_2.4.0 DBI_1.1.1
#> [13] rstudioapi_0.13 parallel_4.0.0 yaml_2.2.1 xfun_0.22
#> [17] withr_2.4.2 knitr_1.32 generics_0.1.0 fs_1.5.0
#> [21] vctrs_0.3.7 globals_0.14.0 tidyselect_1.1.0 doFuture_0.12.0
#> [25] glue_1.4.2 listenv_0.8.0 R6_2.5.0 parallelly_1.24.0
#> [29] fansi_0.4.2 rmarkdown_2.7 purrr_0.3.4 codetools_0.2-18
#> [33] ellipsis_0.3.1 htmltools_0.5.1.1 assertthat_0.2.1 utf8_1.2.1
#> [37] stringi_1.5.3 crayon_1.4.1
问题与 withRestarts 函数的名称有关....请勿使用以 muffle 开头的名称。