如何使用 R future 包在集群内进行并行计算?

How to do parallel computing inside a cluster with the R future package?

我想在集群(多台机器)的节点内分配作业(使用 for 循环)。我尝试使用 R 包 future 来做到这一点。我不知道这是否是最好的方法;我尝试使用 doParallel 包的 foreach,但没有成功。如何判断循环迭代次数何时大于集群节点数?

library(doParallel);
library(doFuture);
#library(future);

registerDoFuture();

workers <- c(rep("129.20.25.61",1), rep("129.20.25.217",1));
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "", verbose = FALSE);

plan(cluster, workers = cl)

mu <- 1.0
sigma <- 2.0

for(i in 1:3){
 res %<-%{ rnorm(i, mean = mu, sd = sigma)}
 print(i);
}

如果你使用普通的 Future API,即 future() + value()%<-%,则不需要涉及 foreach,doFuture 等。这是如何单独使用 Future API 以及预期的输出:

(A) 设置工人

library("future")

workers <- c("129.20.25.61", "129.20.25.217")
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "")
### starting worker pid=20026 on localhost:11900 at 11:47:28.334
### starting worker pid=12291 on localhost:11901 at 11:47:37.172

print(cl)
### socket cluster with 2 nodes on hosts '129.20.25.61', '129.20.25.217'

plan(cluster, workers = cl)

(B) 显式未来 API

在这里,我们使用 future() 显式创建一个期货列表,并使用 values() 检索它们的值(基本上等于调用 lapply(f, FUN = value))。

mu <- 1.0
sigma <- 2.0

f <- list()
for (i in 1:3) {
  f[[i]] <- future({ rnorm(i, mean = mu, sd = sigma) })
}
v <- values(f)
str(v)
### List of 3
###  $ : num 3.25
###  $ : num [1:2] 3.24 3.29
###  $ : num [1:3] 1.251 2.299 0.923

(C) 隐式未来 API

在这个替代方案中,我们使用未来赋值运算符 %<-% 隐式创建未来(当您尝试访问未来的价值时,它在内部将执行 future() 然后 value() )。由于 %<-% 只能分配给环境(而不是列表、data.frames 等),我们需要使用作为环境的容器。这里我使用 listenv class,这是一个环境,但允许您将其索引为列表。

library("listenv")  ## listenv()
mu <- 1.0
sigma <- 2.0

v <- listenv()
for (i in 1:3) {
  v[[i]] %<-% { rnorm(i, mean = mu, sd = sigma) }
}
v <- as.list(v)
str(v)
### List of 3
###  $ : num 1.15
###  $ : num [1:2] 2.2277 -0.0164
###  $ : num [1:3] -2.09 3.34 -1.09

(D) 使用 future_lapply()

如果您更喜欢 lapply() 式的方法,您可以这样做:

v <- future_lapply(1:3, FUN = function(i) {
  rnorm(i, mean = mu, sd = sigma)
})
str(v)
### List of 3
###  $ : num 2.12
###  $ : num [1:2] 2.56 -1.21
###  $ : num [1:3] 2.89 -0.159 -0.983

(D) 使用 foreach()

如果您想使用foreach(),那么您可以按如下方式操作。请注意,在每个 foreach 设计中使用 foreach() 时最好始终显式导出全局变量 - 但是,如果您 总是 使用 doFuture 实际上不需要。

library("doFuture")
registerDoFuture()
workers <- c("129.20.25.61", "129.20.25.217")
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "")
plan(cluster, workers = cl)

v <- foreach(i = 1:3, .export = c("mu", "sigma")) %dopar% {
  rnorm(i, mean = mu, sd = sigma)
}
str(v)
### List of 3
###  $ : num 3.12
###  $ : num [1:2] -0.0887 -2.8016
###  $ : num [1:3] 2.15 3.5 -2.24

How can I figure out when the number of the loop iterations is taller than the number of cluster nodes?

我不确定你在这里问什么。您是否担心在拥有工人的同时拥有更多的期货 运行?如果是这样,那将自动得到处理。如果所有 worker 都被占用,那么创建额外的 futures 将阻塞,直到其中一个 worker 再次可用。