如何在 R 中将 `foreach` 和 `%dopar%` 与 `R6` class 一起使用?
How to use `foreach` and `%dopar%` with an `R6` class in R?
我 运行 遇到一个问题,试图将 %dopar%
和 foreach()
与 R6
class 一起使用。四处搜索,我只能找到两个与此相关的资源,R6
存储库中一个未回答的 SO question and an open GitHub issue。
在一条评论(即 GitHub 问题)中,建议通过将 class 的 parent_env
重新分配为 SomeClass$parent_env <- environment()
来解决此问题。我想了解在 foreach
的 %dopar%
中调用此表达式(即 SomeClass$parent_env <- environment()
)时 environment()
究竟指的是什么?
这是一个最小的可重现示例:
Work <- R6::R6Class("Work",
public = list(
values = NULL,
initialize = function() {
self$values <- "some values"
}
)
)
现在,下面的Task
class在构造函数中使用了Work
class
Task <- R6::R6Class("Task",
private = list(
..work = NULL
),
public = list(
initialize = function(time) {
private$..work <- Work$new()
Sys.sleep(time)
}
),
active = list(
work = function() {
return(private$..work)
}
)
)
在Factory
class中创建了Task
class,在..m.thread()
中实现了foreach
。
Factory<- R6::R6Class("Factory",
private = list(
..warehouse = list(),
..amount = NULL,
..parallel = NULL,
..m.thread = function(object, ...) {
cluster <- parallel::makeCluster(parallel::detectCores() - 1)
doParallel::registerDoParallel(cluster)
private$..warehouse <- foreach::foreach(1:private$..amount, .export = c("Work")) %dopar% {
# What exactly does `environment()` encapsulate in this context?
object$parent_env <- environment()
object$new(...)
}
parallel::stopCluster(cluster)
},
..s.thread = function(object, ...) {
for (i in 1:private$..amount) {
private$..warehouse[[i]] <- object$new(...)
}
},
..run = function(object, ...) {
if(private$..parallel) {
private$..m.thread(object, ...)
} else {
private$..s.thread(object, ...)
}
}
),
public = list(
initialize = function(object, ..., amount = 10, parallel = FALSE) {
private$..amount = amount
private$..parallel = parallel
private$..run(object, ...)
}
),
active = list(
warehouse = function() {
return(private$..warehouse)
}
)
)
那么,它被称为:
library(foreach)
x = Factory$new(Task, time = 2, amount = 10, parallel = TRUE)
如果没有以下行 object$parent_env <- environment()
,它会引发错误(即,如其他两个链接中所述):Error in { : task 1 failed - "object 'Work' not found"
.
我想知道,(1) 在 foreach
中分配 parent_env
时有哪些潜在的陷阱,以及 (2) 为什么它首先起作用?
更新 1:
- 我从
foreach()
中返回了 environment()
,因此 private$..warehouse
捕获了那些环境
- 在调试会话中使用
rlang::env_print()
(即 browser()
语句被放置在 foreach
结束执行之后)它们的组成如下:
Browse[1]> env_print(private$..warehouse[[1]])
# <environment: 000000001A8332F0>
# parent: <environment: global>
# bindings:
# * Work: <S3: R6ClassGenerator>
# * ...: <...>
Browse[1]> env_print(environment())
# <environment: 000000001AC0F890>
# parent: <environment: 000000001AC20AF0>
# bindings:
# * private: <env>
# * cluster: <S3: SOCKcluster>
# * ...: <...>
Browse[1]> env_print(parent.env(environment()))
# <environment: 000000001AC20AF0>
# parent: <environment: global>
# bindings:
# * private: <env>
# * self: <S3: Factory>
Browse[1]> env_print(parent.env(parent.env(environment())))
# <environment: global>
# parent: <environment: package:rlang>
# bindings:
# * Work: <S3: R6ClassGenerator>
# * .Random.seed: <int>
# * Factory: <S3: R6ClassGenerator>
# * Task: <S3: R6ClassGenerator>
免责声明:我在这里所说的很多内容都是基于我所知道的有根据的猜测和推论,
我不能保证一切都是 100% 正确的。
我认为可能会有很多陷阱,
哪一个适用真的取决于你做什么。
我认为你的第二个问题更重要,
因为如果你明白这一点,
您将能够自己评估一些陷阱。
题目比较复杂,
但您或许可以从阅读 R's lexical scoping 开始。
本质上,R 有一种环境层次结构,
当执行 R 代码时,
在当前环境中找不到其值的变量
(这就是 environment()
returns)
在 parent 环境中寻找
(不要与调用者环境混淆)。
根据您链接的 GitHub 问题,
R6
生成器将 "reference" 保存到他们的 parent 环境中,
他们希望他们的 类 可能需要的一切都可以在上述 parent 或环境层次结构中的某个地方找到,
从 parent 开始,一直到 "up"。
您使用的解决方法之所以有效,是因为您将生成器的 parent 环境替换为并行工作程序中当前 foreach
调用中的环境
(可能是不同的 R 进程,不一定是不同的线程),
并且,鉴于您的 .export
规范可能会导出必要的值,
然后,R 的词法范围界定可以从单独的 thread/process.
中的 foreach
调用开始搜索缺失值
对于您链接的具体示例,
我发现一个更简单的方法让它工作
(至少在我的 Linux 机器上)
就是做以下事情:
library(doParallel)
cluster <- parallel::makeCluster(parallel::detectCores() - 1)
doParallel::registerDoParallel(cluster)
parallel::clusterExport(cluster, setdiff(ls(), "cluster"))
x = Factory$new(Task, time = 1, amount = 3)
但 将 ..m.thread
函数保留为:
..m.thread = function(object, amount, ...) {
private$..warehouse <- foreach::foreach(1:amount) %dopar% {
object$new(...)
}
}
(完成后手动调用 stopCluster
)。
clusterExport
调用的语义应类似于*:
从主 R 进程的全局环境中获取除 cluster
之外的所有内容,
并使其在每个并行工作者的全局环境中可用。
这样,当词法作用域到达它们各自的全局环境时,foreach
调用中的任何代码都可以使用生成器。
foreach
可以很聪明,自动导出一些变量
(如 GitHub 问题所示),
但它有局限性,
并且在词法作用域中使用的层次结构会变得非常混乱。
*我说 "similar to" 因为我不知道如果使用分叉,R 到底做了什么来区分(全局)环境,
但由于需要导出,
我假设他们确实是相互独立的。
PS:如果您在函数调用中创建工人,我会调用 on.exit(parallel::stopCluster(cluster))
,
这样你就可以避免在发生错误时将进程留在原处,直到它们以某种方式停止。
我 运行 遇到一个问题,试图将 %dopar%
和 foreach()
与 R6
class 一起使用。四处搜索,我只能找到两个与此相关的资源,R6
存储库中一个未回答的 SO question and an open GitHub issue。
在一条评论(即 GitHub 问题)中,建议通过将 class 的 parent_env
重新分配为 SomeClass$parent_env <- environment()
来解决此问题。我想了解在 foreach
的 %dopar%
中调用此表达式(即 SomeClass$parent_env <- environment()
)时 environment()
究竟指的是什么?
这是一个最小的可重现示例:
Work <- R6::R6Class("Work",
public = list(
values = NULL,
initialize = function() {
self$values <- "some values"
}
)
)
现在,下面的Task
class在构造函数中使用了Work
class
Task <- R6::R6Class("Task",
private = list(
..work = NULL
),
public = list(
initialize = function(time) {
private$..work <- Work$new()
Sys.sleep(time)
}
),
active = list(
work = function() {
return(private$..work)
}
)
)
在Factory
class中创建了Task
class,在..m.thread()
中实现了foreach
。
Factory<- R6::R6Class("Factory",
private = list(
..warehouse = list(),
..amount = NULL,
..parallel = NULL,
..m.thread = function(object, ...) {
cluster <- parallel::makeCluster(parallel::detectCores() - 1)
doParallel::registerDoParallel(cluster)
private$..warehouse <- foreach::foreach(1:private$..amount, .export = c("Work")) %dopar% {
# What exactly does `environment()` encapsulate in this context?
object$parent_env <- environment()
object$new(...)
}
parallel::stopCluster(cluster)
},
..s.thread = function(object, ...) {
for (i in 1:private$..amount) {
private$..warehouse[[i]] <- object$new(...)
}
},
..run = function(object, ...) {
if(private$..parallel) {
private$..m.thread(object, ...)
} else {
private$..s.thread(object, ...)
}
}
),
public = list(
initialize = function(object, ..., amount = 10, parallel = FALSE) {
private$..amount = amount
private$..parallel = parallel
private$..run(object, ...)
}
),
active = list(
warehouse = function() {
return(private$..warehouse)
}
)
)
那么,它被称为:
library(foreach)
x = Factory$new(Task, time = 2, amount = 10, parallel = TRUE)
如果没有以下行 object$parent_env <- environment()
,它会引发错误(即,如其他两个链接中所述):Error in { : task 1 failed - "object 'Work' not found"
.
我想知道,(1) 在 foreach
中分配 parent_env
时有哪些潜在的陷阱,以及 (2) 为什么它首先起作用?
更新 1:
- 我从
foreach()
中返回了environment()
,因此private$..warehouse
捕获了那些环境 - 在调试会话中使用
rlang::env_print()
(即browser()
语句被放置在foreach
结束执行之后)它们的组成如下:
Browse[1]> env_print(private$..warehouse[[1]])
# <environment: 000000001A8332F0>
# parent: <environment: global>
# bindings:
# * Work: <S3: R6ClassGenerator>
# * ...: <...>
Browse[1]> env_print(environment())
# <environment: 000000001AC0F890>
# parent: <environment: 000000001AC20AF0>
# bindings:
# * private: <env>
# * cluster: <S3: SOCKcluster>
# * ...: <...>
Browse[1]> env_print(parent.env(environment()))
# <environment: 000000001AC20AF0>
# parent: <environment: global>
# bindings:
# * private: <env>
# * self: <S3: Factory>
Browse[1]> env_print(parent.env(parent.env(environment())))
# <environment: global>
# parent: <environment: package:rlang>
# bindings:
# * Work: <S3: R6ClassGenerator>
# * .Random.seed: <int>
# * Factory: <S3: R6ClassGenerator>
# * Task: <S3: R6ClassGenerator>
免责声明:我在这里所说的很多内容都是基于我所知道的有根据的猜测和推论, 我不能保证一切都是 100% 正确的。
我认为可能会有很多陷阱, 哪一个适用真的取决于你做什么。 我认为你的第二个问题更重要, 因为如果你明白这一点, 您将能够自己评估一些陷阱。
题目比较复杂,
但您或许可以从阅读 R's lexical scoping 开始。
本质上,R 有一种环境层次结构,
当执行 R 代码时,
在当前环境中找不到其值的变量
(这就是 environment()
returns)
在 parent 环境中寻找
(不要与调用者环境混淆)。
根据您链接的 GitHub 问题,
R6
生成器将 "reference" 保存到他们的 parent 环境中,
他们希望他们的 类 可能需要的一切都可以在上述 parent 或环境层次结构中的某个地方找到,
从 parent 开始,一直到 "up"。
您使用的解决方法之所以有效,是因为您将生成器的 parent 环境替换为并行工作程序中当前 foreach
调用中的环境
(可能是不同的 R 进程,不一定是不同的线程),
并且,鉴于您的 .export
规范可能会导出必要的值,
然后,R 的词法范围界定可以从单独的 thread/process.
foreach
调用开始搜索缺失值
对于您链接的具体示例, 我发现一个更简单的方法让它工作 (至少在我的 Linux 机器上) 就是做以下事情:
library(doParallel)
cluster <- parallel::makeCluster(parallel::detectCores() - 1)
doParallel::registerDoParallel(cluster)
parallel::clusterExport(cluster, setdiff(ls(), "cluster"))
x = Factory$new(Task, time = 1, amount = 3)
但 将 ..m.thread
函数保留为:
..m.thread = function(object, amount, ...) {
private$..warehouse <- foreach::foreach(1:amount) %dopar% {
object$new(...)
}
}
(完成后手动调用 stopCluster
)。
clusterExport
调用的语义应类似于*:
从主 R 进程的全局环境中获取除 cluster
之外的所有内容,
并使其在每个并行工作者的全局环境中可用。
这样,当词法作用域到达它们各自的全局环境时,foreach
调用中的任何代码都可以使用生成器。
foreach
可以很聪明,自动导出一些变量
(如 GitHub 问题所示),
但它有局限性,
并且在词法作用域中使用的层次结构会变得非常混乱。
*我说 "similar to" 因为我不知道如果使用分叉,R 到底做了什么来区分(全局)环境, 但由于需要导出, 我假设他们确实是相互独立的。
PS:如果您在函数调用中创建工人,我会调用 on.exit(parallel::stopCluster(cluster))
,
这样你就可以避免在发生错误时将进程留在原处,直到它们以某种方式停止。