R parLapply 不平行
R parLapply not parallel
我目前正在开发一个 R 包,它将通过 "parallel" 包使用并行计算来解决一些任务。
在使用我的包的函数内部定义的集群时,我遇到了一些非常尴尬的行为,其中 parLapply 函数将一个工作分配给一个工作人员并等待它完成以将工作分配给下一个工作人员。
或者至少这似乎是正在发生的事情,通过观察日志文件 "cluster.log" 和 unix shell.
中的 运行ning 进程列表
下面是我的包中声明的原始函数的模型版本:
.parSolver <- function( varMatrix, var1 ) {
no_cores <- detectCores()
#Rows in varMatrix
rows <- 1:nrow(varMatrix[,])
# Split rows in n parts
n <- no_cores
parts <- split(rows, cut(rows, n))
# Initiate cluster
cl <- makePSOCKcluster(no_cores, methods = FALSE, outfile = "/home/cluster.log")
clusterEvalQ(cl, library(raster))
clusterExport(cl, "varMatrix", envir=environment())
clusterExport(cl, "var1", envir=environment())
rParts <- parLapply(cl = cl, X = 1:n, fun = function(x){
part <- rasterize(varMatrix[parts[[x]],], raster(var1), .....)
print(x)
return(part)
})
do.call(merge, rParts)
}
备注:
- 我正在使用 makePSOCKcluster,因为我希望代码在 windows 和类似的 unix 系统上 运行 尽管这个特殊问题只在 unix 系统中表现出来。
- 函数 rasterize 和 raster 在 library(raster) 中定义,导出到集群。
对我来说奇怪的是,如果我在全局环境中执行函数 parSolver 的完全相同的代码,一切都会顺利进行,所有工作人员同时接受一份工作,任务会立即完成。
但是,如果我做类似的事情:
library(myPackage)
varMatrix <- (...)
var1 <- (...)
result <- parSolver(varMatrix, var1)
出现描述的问题。
这似乎是一个负载平衡问题,但这并不能解释为什么它在一种情况下可以正常工作而在另一种情况下却不能。
我是不是漏掉了什么?
提前致谢。
我不认为 parLapply
是 运行ning 顺序。更有可能的是,它只是 运行 效率低下,使其看起来 运行 顺序。
我有几点改进建议:
- 不要在里面定义worker函数
parSolver
- 不要将所有
varMatrix
导出给每个工人
- 在
parSolver
之外创建集群
第一点很重要,因为正如您现在的示例所示,parSolver
中定义的所有变量都将与匿名工作函数一起序列化,并由 parLapply
发送给工作人员。通过在任何函数之外定义辅助函数,序列化将不会捕获任何不需要的变量。
第二点避免了不必要的套接字I/O并且使用更少的内存,使代码更具可扩展性。
这是一个与您的示例类似的伪造但独立的示例,可以证明我的建议:
# Define worker function outside of any function to avoid
# serialization problems (such as unexpected variable capture)
workerfn <- function(mat, var1) {
library(raster)
mat * var1
}
parSolver <- function(cl, varMatrix, var1) {
parts <- splitIndices(nrow(varMatrix), length(cl))
varMatrixParts <- lapply(parts, function(i) varMatrix[i,,drop=FALSE])
rParts <- clusterApply(cl, varMatrixParts, workerfn, var1)
do.call(rbind, rParts)
}
library(parallel)
cl <- makePSOCKcluster(3)
r <- parSolver(cl, matrix(1:20, 10, 2), 2)
print(r)
请注意,这利用 clusterApply
函数迭代 varMatrix
的行块列表,因此不需要将整个矩阵发送给每个人。它还避免了对 clusterEvalQ
和 clusterExport
的调用,简化了代码,并提高了效率。
我目前正在开发一个 R 包,它将通过 "parallel" 包使用并行计算来解决一些任务。
在使用我的包的函数内部定义的集群时,我遇到了一些非常尴尬的行为,其中 parLapply 函数将一个工作分配给一个工作人员并等待它完成以将工作分配给下一个工作人员。 或者至少这似乎是正在发生的事情,通过观察日志文件 "cluster.log" 和 unix shell.
中的 运行ning 进程列表下面是我的包中声明的原始函数的模型版本:
.parSolver <- function( varMatrix, var1 ) {
no_cores <- detectCores()
#Rows in varMatrix
rows <- 1:nrow(varMatrix[,])
# Split rows in n parts
n <- no_cores
parts <- split(rows, cut(rows, n))
# Initiate cluster
cl <- makePSOCKcluster(no_cores, methods = FALSE, outfile = "/home/cluster.log")
clusterEvalQ(cl, library(raster))
clusterExport(cl, "varMatrix", envir=environment())
clusterExport(cl, "var1", envir=environment())
rParts <- parLapply(cl = cl, X = 1:n, fun = function(x){
part <- rasterize(varMatrix[parts[[x]],], raster(var1), .....)
print(x)
return(part)
})
do.call(merge, rParts)
}
备注:
- 我正在使用 makePSOCKcluster,因为我希望代码在 windows 和类似的 unix 系统上 运行 尽管这个特殊问题只在 unix 系统中表现出来。
- 函数 rasterize 和 raster 在 library(raster) 中定义,导出到集群。
对我来说奇怪的是,如果我在全局环境中执行函数 parSolver 的完全相同的代码,一切都会顺利进行,所有工作人员同时接受一份工作,任务会立即完成。 但是,如果我做类似的事情:
library(myPackage)
varMatrix <- (...)
var1 <- (...)
result <- parSolver(varMatrix, var1)
出现描述的问题。
这似乎是一个负载平衡问题,但这并不能解释为什么它在一种情况下可以正常工作而在另一种情况下却不能。
我是不是漏掉了什么? 提前致谢。
我不认为 parLapply
是 运行ning 顺序。更有可能的是,它只是 运行 效率低下,使其看起来 运行 顺序。
我有几点改进建议:
- 不要在里面定义worker函数
parSolver
- 不要将所有
varMatrix
导出给每个工人 - 在
parSolver
之外创建集群
第一点很重要,因为正如您现在的示例所示,parSolver
中定义的所有变量都将与匿名工作函数一起序列化,并由 parLapply
发送给工作人员。通过在任何函数之外定义辅助函数,序列化将不会捕获任何不需要的变量。
第二点避免了不必要的套接字I/O并且使用更少的内存,使代码更具可扩展性。
这是一个与您的示例类似的伪造但独立的示例,可以证明我的建议:
# Define worker function outside of any function to avoid
# serialization problems (such as unexpected variable capture)
workerfn <- function(mat, var1) {
library(raster)
mat * var1
}
parSolver <- function(cl, varMatrix, var1) {
parts <- splitIndices(nrow(varMatrix), length(cl))
varMatrixParts <- lapply(parts, function(i) varMatrix[i,,drop=FALSE])
rParts <- clusterApply(cl, varMatrixParts, workerfn, var1)
do.call(rbind, rParts)
}
library(parallel)
cl <- makePSOCKcluster(3)
r <- parSolver(cl, matrix(1:20, 10, 2), 2)
print(r)
请注意,这利用 clusterApply
函数迭代 varMatrix
的行块列表,因此不需要将整个矩阵发送给每个人。它还避免了对 clusterEvalQ
和 clusterExport
的调用,简化了代码,并提高了效率。