用 doMC 替换平行 plyr
Replacement for parallel plyr with doMC
考虑对 data.frame:
的标准分组操作library(plyr)
library(doMC)
library(MASS) # for example
nc <- 12
registerDoMC(nc)
d <- data.frame(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"
res <- ddply(d, .(g), function(d_group) {
# slow, complicated operations on d_group
}, .parallel = FALSE)
通过简单地编写 .parallel = TRUE
来利用多核设置是微不足道的。这是我最喜欢的 plyr 功能之一。
但是随着 plyr 被弃用(我认为)并且基本上被 dplyr、purrr 等取代,并行处理的解决方案变得更加冗长:
library(dplyr)
library(multidplyr)
library(parallel)
library(MASS) # for example
nc <- 12
d <- tibble(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"
cl <- create_cluster(nc)
set_default_cluster(cl)
cluster_library(cl, packages = c("MASS"))
cluster_copy(cl, obj = y)
d_parts <- d %>% partition(g, cluster = cl)
res <- d_parts %>% collect() %>% ungroup()
rm(d_parts)
rm(cl)
考虑到循环内所需的每个包和对象都需要自己的 cluster_*
命令将其复制到节点,您可以想象这个示例需要多长时间。非并行化的 plyr 到 dplyr 转换只是一个简单的 dplyr::group_by
构造,不幸的是没有简洁的方法来启用并行处理。所以,我的问题是:
- 这实际上是将我的代码从 plyr 转换为 dplyr 的首选方式吗?
- 在 plyr 的幕后发生了什么样的魔法,使得开启并行处理变得如此容易?是否有理由将此功能添加到 dplyr 中特别困难,这就是它尚不存在的原因?
- 我的两个例子在代码的执行方式上有根本的不同吗?
我认为没有一种真正的 'prefered' 方法可以将 {plyr} 代码转换为 {dplyr}。
在评论中,@Aurèle 在描述 {plyr} 和 {doMC} 之间的联系方面比我做得更好。发生的一件事是激励措施发生了一些变化。 {doMC} 来自 Revolution Analytics(自从被 Microsoft 购买)。但是开发了 dplyr 的 Hadley 目前在 RStudio 工作。这两家公司竞争 IDE space。因此,他们的软件包无法很好地协同工作也许是很自然的。我看到 RStudio 强烈支持的唯一并行形式是 {sparklyr},他们已经相对 'easy' 设置了它。但是,我真的不能推荐使用 Spark 来为单台机器进行并行处理。
@Aurèle 再次很好地解释了执行差异。您的新代码使用 PSOCK 集群,而旧代码使用分叉。 Fork 使用写时复制模式来访问 RAM,因此并行进程可以立即开始访问相同的数据 post fork。 PSOCK 集群就像生成 R 的新副本 - 它们必须加载库并接收数据的显式副本。
您可以使用像这样的模式...
library(dplyr)
library(purrr)
library(future)
plan(multicore)
options(mc.cores = availableCores())
d <- data.frame(x = 1:8, g = c("group1", "group2", "group3", "group4"))
y <- "some global object"
split(d, d$g) %>%
map(~ future({Sys.sleep(5);mean(.x$x)})) %>%
map_df(~value(.x))
...在 map_df
步骤上进行一些技巧以进行一些并行处理。请注意,在 {purrr} 下,~ 是匿名函数语法,其中 .x 是已映射到的值。
如果你喜欢危险的生活,你可以通过在 {purrr}
中使用私有方法,在不使用 {future} 的情况下创建类似的版本mcmap <- function(.x, .f, ...) {
.f <- as_mapper(.f, ...)
mclapply(.x, function(.x) {
force(.f)
.Call(purrr:::map_impl, environment(), ".x", ".f", "list")
}) %>%
map(~ .x[[1]])
}