doParallel (package) foreach 不适用于 R 中的大迭代
doParallel (package) foreach does not work for big iterations in R
我正在 运行在具有 4 个和 8 个物理和逻辑内核的 PC (OS Linux) 上使用以下代码(从 doParallel's Vignettes 中提取),分别。
运行 iter=1e+6
或更少的代码,一切都很好,我可以从 CPU 用法中看到所有内核都用于此计算。然而,随着迭代次数的增加(例如 iter=4e+6
),并行计算似乎在这种情况下不起作用。当我还监控 CPU 使用情况时,只有一个核心参与计算(100% 使用)。
示例 1
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
})[3]
您知道可能是什么原因吗?内存可能是原因吗?
我四处搜索,发现 THIS 与我的问题相关,但关键是我没有收到任何类型的错误,而且 OP 似乎已经通过在 foreach
循环。但是可以看出,我的循环中没有使用包。
更新1
我的问题还是没有解决。根据我的实验,我不认为内存可能是原因。我在系统上有 8GB 的内存,我在上面 运行 以下简单的并行(在所有 8 个逻辑核心上)迭代:
示例 2
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
i
}
})[3]
我对 运行 这段代码没有问题,但是当我监控 CPU 使用情况时,只有一个核心(8 个核心)是 100%。
更新2
至于 Example2,@SteveWeston(感谢您指出这一点)表示(在评论中):"The example in your update is suffering from having tiny tasks. Only the master has any real work to do, which consists of sending tasks and processing results. That's fundamentally different than the problem with the original example which did use multiple cores on a smaller number of iterations."
然而,Example1仍然没有解决。当我 运行 它并使用 htop
监视进程时,这里是更详细的情况:
让我们将所有 8 个创建的进程命名为 p1
到 p8
。 p1
的状态(htop
中的 S
列)是 R
,这意味着它是 运行ning 并且保持不变。但是,对于 p2
到 p8
,几分钟后,状态变为 D
(即不间断睡眠),几分钟后再次变为 Z
(即终止但未被其父级收割)。你知道为什么会这样吗?
我认为您 运行 内存不足。这是该示例的修改版本,当您有许多任务时,它应该能更好地工作。它使用 doSNOW 而不是 doParallel,因为 doSNOW 允许您使用 combine 函数处理工作人员返回的结果。此示例将这些结果写入一个文件以使用更少的内存,但是它在最后使用“.final”函数将结果读回内存,但如果您没有足够的内存,您可以跳过它。
library(doSNOW)
library(tcltk)
nw <- 4 # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)
x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000 # may require tuning for your machine
maxcomb <- nw + 1 # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)
comb <- function(fobj, ...) {
for(r in list(...))
writeBin(r, fobj)
fobj
}
final <- function(fobj) {
close(fobj)
t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}
mkprogress <- function(total) {
pb <- tkProgressBar(max=total,
label=sprintf('total tasks: %d', total))
function(n, tag) {
setTkProgressBar(pb, n,
label=sprintf('last completed task: %d of %d', tag, total))
}
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')
r <-
foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
.maxcombine=maxcomb, .init=resultFile, .final=final,
.inorder=FALSE, .options.snow=opts) %dopar% {
do.call('c', lapply(seq_len(n), function(i) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}))
}
我加入了一个进度条,因为这个例子需要几个小时才能执行。
请注意,此示例还使用 iterators
包中的 idiv
函数来增加每个任务的工作量。这种技术称为 chunking,通常可以提高并行性能。但是,使用 idiv
会弄乱任务索引,因为变量 i
现在是每个任务的索引而不是全局索引。对于全局索引,您可以编写一个包装 idiv
:
的自定义迭代器
idivix <- function(n, chunkSize) {
i <- 1
it <- idiv(n, chunkSize=chunkSize)
nextEl <- function() {
m <- nextElem(it) # may throw 'StopIterator'
value <- list(i=i, m=m)
i <<- i + m
value
}
obj <- list(nextElem=nextEl)
class(obj) <- c('abstractiter', 'iter')
obj
}
此迭代器发出的值是列表,每个列表包含一个起始索引和一个计数。这是一个使用此自定义迭代器的简单 foreach 循环:
r <-
foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
i
}))
}
当然,如果任务的计算强度足够大,您可能不需要分块,可以像原始示例中那样使用简单的 foreach 循环。
起初我以为你 运行 遇到了内存问题,因为提交许多任务确实会占用更多内存,最终会导致主进程陷入困境,所以我的原始答案展示了几种使用技巧更少的内存。但是,现在听起来好像有一个启动和关闭阶段,只有 master 进程在忙,但是 workers 在中间有一段时间很忙。我认为问题是这个例子中的任务并不是真正的计算密集型,所以当你有很多任务时,你会开始真正注意到启动和关闭时间。我对实际计算进行了计时,发现每个任务只需要大约 3 毫秒。在过去,你不会从并行计算中获得任何小任务的好处,但现在,根据你的机器,你可以获得一些好处,但开销很大,所以当你有很多任务时,你真的会注意到开销。
我仍然认为我的另一个答案对这个问题很有效,但是既然你有足够的内存,那就太过分了。使用分块的最重要技术。这是一个使用分块的示例,对原始示例进行了最小的更改:
require("doParallel")
nw <- 8
registerDoParallel(nw)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
niter <- 4e+6
r <- foreach(n=idiv(niter, chunks=nw), .combine='rbind') %dopar% {
do.call('rbind', lapply(seq_len(n), function(i) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}))
}
请注意,这与我的其他回答略有不同。通过使用 idiv chunks
选项而不是 chunkSize
选项,它只为每个工作人员使用一个任务。这减少了 master 完成的工作量,如果你有足够的内存,这是一个很好的策略。
我正在 运行在具有 4 个和 8 个物理和逻辑内核的 PC (OS Linux) 上使用以下代码(从 doParallel's Vignettes 中提取),分别。
运行 iter=1e+6
或更少的代码,一切都很好,我可以从 CPU 用法中看到所有内核都用于此计算。然而,随着迭代次数的增加(例如 iter=4e+6
),并行计算似乎在这种情况下不起作用。当我还监控 CPU 使用情况时,只有一个核心参与计算(100% 使用)。
示例 1
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
})[3]
您知道可能是什么原因吗?内存可能是原因吗?
我四处搜索,发现 THIS 与我的问题相关,但关键是我没有收到任何类型的错误,而且 OP 似乎已经通过在 foreach
循环。但是可以看出,我的循环中没有使用包。
更新1
我的问题还是没有解决。根据我的实验,我不认为内存可能是原因。我在系统上有 8GB 的内存,我在上面 运行 以下简单的并行(在所有 8 个逻辑核心上)迭代:
示例 2
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
i
}
})[3]
我对 运行 这段代码没有问题,但是当我监控 CPU 使用情况时,只有一个核心(8 个核心)是 100%。
更新2
至于 Example2,@SteveWeston(感谢您指出这一点)表示(在评论中):"The example in your update is suffering from having tiny tasks. Only the master has any real work to do, which consists of sending tasks and processing results. That's fundamentally different than the problem with the original example which did use multiple cores on a smaller number of iterations."
然而,Example1仍然没有解决。当我 运行 它并使用 htop
监视进程时,这里是更详细的情况:
让我们将所有 8 个创建的进程命名为 p1
到 p8
。 p1
的状态(htop
中的 S
列)是 R
,这意味着它是 运行ning 并且保持不变。但是,对于 p2
到 p8
,几分钟后,状态变为 D
(即不间断睡眠),几分钟后再次变为 Z
(即终止但未被其父级收割)。你知道为什么会这样吗?
我认为您 运行 内存不足。这是该示例的修改版本,当您有许多任务时,它应该能更好地工作。它使用 doSNOW 而不是 doParallel,因为 doSNOW 允许您使用 combine 函数处理工作人员返回的结果。此示例将这些结果写入一个文件以使用更少的内存,但是它在最后使用“.final”函数将结果读回内存,但如果您没有足够的内存,您可以跳过它。
library(doSNOW)
library(tcltk)
nw <- 4 # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)
x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000 # may require tuning for your machine
maxcomb <- nw + 1 # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)
comb <- function(fobj, ...) {
for(r in list(...))
writeBin(r, fobj)
fobj
}
final <- function(fobj) {
close(fobj)
t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}
mkprogress <- function(total) {
pb <- tkProgressBar(max=total,
label=sprintf('total tasks: %d', total))
function(n, tag) {
setTkProgressBar(pb, n,
label=sprintf('last completed task: %d of %d', tag, total))
}
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')
r <-
foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
.maxcombine=maxcomb, .init=resultFile, .final=final,
.inorder=FALSE, .options.snow=opts) %dopar% {
do.call('c', lapply(seq_len(n), function(i) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}))
}
我加入了一个进度条,因为这个例子需要几个小时才能执行。
请注意,此示例还使用 iterators
包中的 idiv
函数来增加每个任务的工作量。这种技术称为 chunking,通常可以提高并行性能。但是,使用 idiv
会弄乱任务索引,因为变量 i
现在是每个任务的索引而不是全局索引。对于全局索引,您可以编写一个包装 idiv
:
idivix <- function(n, chunkSize) {
i <- 1
it <- idiv(n, chunkSize=chunkSize)
nextEl <- function() {
m <- nextElem(it) # may throw 'StopIterator'
value <- list(i=i, m=m)
i <<- i + m
value
}
obj <- list(nextElem=nextEl)
class(obj) <- c('abstractiter', 'iter')
obj
}
此迭代器发出的值是列表,每个列表包含一个起始索引和一个计数。这是一个使用此自定义迭代器的简单 foreach 循环:
r <-
foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
i
}))
}
当然,如果任务的计算强度足够大,您可能不需要分块,可以像原始示例中那样使用简单的 foreach 循环。
起初我以为你 运行 遇到了内存问题,因为提交许多任务确实会占用更多内存,最终会导致主进程陷入困境,所以我的原始答案展示了几种使用技巧更少的内存。但是,现在听起来好像有一个启动和关闭阶段,只有 master 进程在忙,但是 workers 在中间有一段时间很忙。我认为问题是这个例子中的任务并不是真正的计算密集型,所以当你有很多任务时,你会开始真正注意到启动和关闭时间。我对实际计算进行了计时,发现每个任务只需要大约 3 毫秒。在过去,你不会从并行计算中获得任何小任务的好处,但现在,根据你的机器,你可以获得一些好处,但开销很大,所以当你有很多任务时,你真的会注意到开销。
我仍然认为我的另一个答案对这个问题很有效,但是既然你有足够的内存,那就太过分了。使用分块的最重要技术。这是一个使用分块的示例,对原始示例进行了最小的更改:
require("doParallel")
nw <- 8
registerDoParallel(nw)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
niter <- 4e+6
r <- foreach(n=idiv(niter, chunks=nw), .combine='rbind') %dopar% {
do.call('rbind', lapply(seq_len(n), function(i) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}))
}
请注意,这与我的其他回答略有不同。通过使用 idiv chunks
选项而不是 chunkSize
选项,它只为每个工作人员使用一个任务。这减少了 master 完成的工作量,如果你有足够的内存,这是一个很好的策略。