R parallel - parRapply 无法正常工作
R parallel - parRapply not working properly
我正在对开发中的包进行一些单元测试。其中一项测试失败。具体来说,我有代码的并行版本和非并行版本。非并行版本完美运行。并行版本未通过单元测试,并出现看似无意义的错误。
## load my development package.
library(devtools) # for install_github
install_github("alexwhitworth/imputation")
## do some setup:
library(imputation)
library(kernlab)
library(parallel)
x1 <- matrix(rnorm(200), 20, 10)
x1[x1 > 1.25] <- NA
x3 <- create_canopies(x1, n_canopies= 5, q= 2)
prelim <- imputation:::impute_prelim(x3[[1]], parallel= TRUE, leave_cores= 1)
opt_h <- (4 * sd(x3[[1]][, -ncol(x3[[1]])], na.rm=T)^5 / (3 * nrow(x3[[1]])))^(1/5)
kern <- rbfdot(opt_h)
## write 2 identical functions:
## one in parallel
## one not in parallel
foo_parallel <- function(x_missing, x_complete, k, q, leave_cores) {
cl <- makeCluster(detectCores() - leave_cores)
x_missing_imputed <- parRapply(cl= cl, x_missing, function(i, x_complete) {
rowID = as.numeric(i[1])
i_original = unlist(i[-1])
x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
missing_cols <- which(is.na(x_complete[x_comp_rowID,]))
# calculate distances
distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ],
x_complete[-x_comp_rowID,]), ref= 1L, q= q)
return(distances)
}, x_complete= x_complete)
stopCluster(cl)
return(x_missing_imputed)
}
foo_nonparallel <- function(x_missing, x_complete, k, q) {
x_missing_imputed <- t(apply(x_missing, 1, function(i, x_complete) {
rowID = as.numeric(i[1])
i_original = unlist(i[-1])
x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
missing_cols <- which(is.na(x_complete[x_comp_rowID,]))
# calculate distances
distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ],
x_complete[-x_comp_rowID,]), ref= 1L, q= q)
return(distances)
}, x_complete= x_complete))
return(x_missing_imputed)
}
## test them
foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 1) # fails
foo_nonparallel(prelim$x_missing, x3[[1]],k=3,q=2) # works
Error in checkForRemoteErrors(val) :
2 nodes produced errors; first error: ref must be an integer in {1, nrow(x)}.
如您所见,ref
明确定义为 ref= 1L
位于 1, nrow(x).
与 library(parallel)
的交互发生了什么导致了这个错误?
编辑 - 我在 windows 机器上:
R> sessionInfo()
R version 3.2.2 (2015-08-14)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 7 x64 (build 7601) Service Pack 1
我已经弄清楚是什么导致了这个问题。在我看来,这似乎是一个 library(parallel)
错误/边缘案例,特定于应用函数的并行版本(在本例中为 parRapply
)。也许年长和更聪明的人可以解释为什么 library(parallel)
对于这种边缘情况没有问题。
问题似乎与任务数量与可用工作人员数量有关。在我的机器上,我有一个 8 核处理器。在这种情况下,有 5 个任务(prelim$x_missing
的每一行一个)。
Granted, in typical use, I wouldn't be parallelizing work for 5 rows. This is just a unit test.
R> prelim$x_missing
X1 X2 X3 X4 X5 X6 X7 X8 X9 X10 d_factor
6 6 0.2604170 -0.5966874 NA NA -0.3013053 0.24313272 0.2836760 0.3977164 -0.60711109 -0.2929253 1
7 7 -0.8540576 0.1409047 NA 0.4801685 -0.9324517 -0.06487733 -0.2220201 NA 1.19077335 -0.3702607 2
8 8 0.5118453 -0.8750674 NA 0.1787238 0.6897163 0.20695122 NA -0.3488021 0.84200408 -0.4791230 1
12 12 0.3695746 -0.4919277 -1.2509180 1.1642152 NA 0.04018417 NA NA -0.53436589 -1.5400345 2
15 15 NA -0.3608242 -0.6761515 -0.5366562 0.1763501 NA NA 0.4967595 0.02635203 -0.6015536 1
请注意,我正在通过 cl <- parallel::makeCluster(detectCores() - leave_cores)
创建集群,其中 detectCores() 将为我当前的机器 return 8。该函数调用接受一个参数,表示要保持打开状态的核心数 leave_cores
。当我创建一个比用例中的行多 cores/nodes 的集群时,该函数失败。当我创建一个 <= 行数的集群时,该函数有效:
# works : detectCores() == 8, 8 - 3 == 5 (number of rows / processes)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 3)
[1] 1.0216313 0.7355635 0.9201501 0.6906554 0.6613939 0.3628872 0.9995641 0.8571252 0.9271800 0.9201501 0.9238215 0.9798824 0.9059506
[14] 0.6891484 1.0158223 0.5442953 0.6906554 0.9238215 0.8607280 0.5897955 1.1084943 0.8518322 0.9227102 0.6613939 0.9798824 0.8607280
[27] 0.9518105 0.9792209 1.1968528 0.4447104 0.3628872 0.9059506 0.5897955 0.9518105 1.1249624
# fails : 8-2 = 6; 6 > nrow(prelim$x_missing)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 2)
Error in checkForRemoteErrors(val) :
one node produced an error: ref must be an integer in {1, nrow(x)}.
tl,博士
如rparallel vignette中所述,detectCores
用于简单地检测核心,它非常合理地不尝试对工人进行任何智能分配任务。
function detectCores() tries to determine the number of CPU cores in the machine on which R is running: it has ways to do so on all known current R
platforms. What exactly it measures is OS-specific: we try where possible to report the number of physical cores available. On Windows the default is to report the number of logical CPUs. On modern hardware (e.g. Intel Core i7 ) the latter may not be unreasonable as hyper-threading does give a significant
extra throughput.
我正在调用函数 parallel::parRapply
来进行计算。 parRapply
通过 splitRows
函数将工作分派给工人。但是 splitRows
函数似乎没有任何智能或错误捕获功能。
R> parRapply
function (cl = NULL, x, FUN, ...)
{
cl <- defaultCluster(cl)
do.call(c, clusterApply(cl = cl, x = splitRows(x, length(cl)),
fun = apply, MARGIN = 1L, FUN = FUN, ...), quote = TRUE)
}
<bytecode: 0x00000000380ca530>
<environment: namespace:parallel>
我找不到 splitRows
的源代码,但 parallel::splitIndices
看起来很相似:
R> parallel:::splitIndices
function (nx, ncl)
{
i <- seq_len(nx)
if (ncl == 0L)
list()
else if (ncl == 1L || nx == 1L)
list(i)
else {
fuzz <- min((nx - 1L)/1000, 0.4 * nx/ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length = ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
}
}
<bytecode: 0x00000000380a7828>
<environment: namespace:parallel>
在我的单元测试中,这将执行如下:
# all 8 cores:
nx <- 5; ncl <- 8
i <- seq_len(nx)
fuzz <- min((nx - 1L)/1000, 0.4 * nx / ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length= ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1
[[2]]
integer(0)
[[3]]
[1] 2
[[4]]
integer(0)
[[5]]
[1] 3
[[6]]
[1] 4
[[7]]
integer(0)
[[8]]
[1] 5
其中有 3 个整数 (0),这会导致调用堆栈进一步失败。
# 3 cores (just showing the return):
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1 2
[[2]]
[1] 3
[[3]]
[1] 4 5
如果有人可以在下面的评论中为 splitRows
的源代码提供 link,我将很乐意更新此答案。 parallel::clusterApply
和 parallel:::staticClusterApply
的代码很容易找到
我正在对开发中的包进行一些单元测试。其中一项测试失败。具体来说,我有代码的并行版本和非并行版本。非并行版本完美运行。并行版本未通过单元测试,并出现看似无意义的错误。
## load my development package.
library(devtools) # for install_github
install_github("alexwhitworth/imputation")
## do some setup:
library(imputation)
library(kernlab)
library(parallel)
x1 <- matrix(rnorm(200), 20, 10)
x1[x1 > 1.25] <- NA
x3 <- create_canopies(x1, n_canopies= 5, q= 2)
prelim <- imputation:::impute_prelim(x3[[1]], parallel= TRUE, leave_cores= 1)
opt_h <- (4 * sd(x3[[1]][, -ncol(x3[[1]])], na.rm=T)^5 / (3 * nrow(x3[[1]])))^(1/5)
kern <- rbfdot(opt_h)
## write 2 identical functions:
## one in parallel
## one not in parallel
foo_parallel <- function(x_missing, x_complete, k, q, leave_cores) {
cl <- makeCluster(detectCores() - leave_cores)
x_missing_imputed <- parRapply(cl= cl, x_missing, function(i, x_complete) {
rowID = as.numeric(i[1])
i_original = unlist(i[-1])
x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
missing_cols <- which(is.na(x_complete[x_comp_rowID,]))
# calculate distances
distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ],
x_complete[-x_comp_rowID,]), ref= 1L, q= q)
return(distances)
}, x_complete= x_complete)
stopCluster(cl)
return(x_missing_imputed)
}
foo_nonparallel <- function(x_missing, x_complete, k, q) {
x_missing_imputed <- t(apply(x_missing, 1, function(i, x_complete) {
rowID = as.numeric(i[1])
i_original = unlist(i[-1])
x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
missing_cols <- which(is.na(x_complete[x_comp_rowID,]))
# calculate distances
distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ],
x_complete[-x_comp_rowID,]), ref= 1L, q= q)
return(distances)
}, x_complete= x_complete))
return(x_missing_imputed)
}
## test them
foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 1) # fails
foo_nonparallel(prelim$x_missing, x3[[1]],k=3,q=2) # works
Error in checkForRemoteErrors(val) : 2 nodes produced errors; first error: ref must be an integer in {1, nrow(x)}.
如您所见,ref
明确定义为 ref= 1L
位于 1, nrow(x).
与 library(parallel)
的交互发生了什么导致了这个错误?
编辑 - 我在 windows 机器上:
R> sessionInfo()
R version 3.2.2 (2015-08-14)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 7 x64 (build 7601) Service Pack 1
我已经弄清楚是什么导致了这个问题。在我看来,这似乎是一个 library(parallel)
错误/边缘案例,特定于应用函数的并行版本(在本例中为 parRapply
)。也许年长和更聪明的人可以解释为什么 library(parallel)
对于这种边缘情况没有问题。
问题似乎与任务数量与可用工作人员数量有关。在我的机器上,我有一个 8 核处理器。在这种情况下,有 5 个任务(prelim$x_missing
的每一行一个)。
Granted, in typical use, I wouldn't be parallelizing work for 5 rows. This is just a unit test.
R> prelim$x_missing
X1 X2 X3 X4 X5 X6 X7 X8 X9 X10 d_factor
6 6 0.2604170 -0.5966874 NA NA -0.3013053 0.24313272 0.2836760 0.3977164 -0.60711109 -0.2929253 1
7 7 -0.8540576 0.1409047 NA 0.4801685 -0.9324517 -0.06487733 -0.2220201 NA 1.19077335 -0.3702607 2
8 8 0.5118453 -0.8750674 NA 0.1787238 0.6897163 0.20695122 NA -0.3488021 0.84200408 -0.4791230 1
12 12 0.3695746 -0.4919277 -1.2509180 1.1642152 NA 0.04018417 NA NA -0.53436589 -1.5400345 2
15 15 NA -0.3608242 -0.6761515 -0.5366562 0.1763501 NA NA 0.4967595 0.02635203 -0.6015536 1
请注意,我正在通过 cl <- parallel::makeCluster(detectCores() - leave_cores)
创建集群,其中 detectCores() 将为我当前的机器 return 8。该函数调用接受一个参数,表示要保持打开状态的核心数 leave_cores
。当我创建一个比用例中的行多 cores/nodes 的集群时,该函数失败。当我创建一个 <= 行数的集群时,该函数有效:
# works : detectCores() == 8, 8 - 3 == 5 (number of rows / processes)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 3)
[1] 1.0216313 0.7355635 0.9201501 0.6906554 0.6613939 0.3628872 0.9995641 0.8571252 0.9271800 0.9201501 0.9238215 0.9798824 0.9059506
[14] 0.6891484 1.0158223 0.5442953 0.6906554 0.9238215 0.8607280 0.5897955 1.1084943 0.8518322 0.9227102 0.6613939 0.9798824 0.8607280
[27] 0.9518105 0.9792209 1.1968528 0.4447104 0.3628872 0.9059506 0.5897955 0.9518105 1.1249624
# fails : 8-2 = 6; 6 > nrow(prelim$x_missing)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 2)
Error in checkForRemoteErrors(val) :
one node produced an error: ref must be an integer in {1, nrow(x)}.
tl,博士
如rparallel vignette中所述,detectCores
用于简单地检测核心,它非常合理地不尝试对工人进行任何智能分配任务。
function detectCores() tries to determine the number of CPU cores in the machine on which R is running: it has ways to do so on all known current R platforms. What exactly it measures is OS-specific: we try where possible to report the number of physical cores available. On Windows the default is to report the number of logical CPUs. On modern hardware (e.g. Intel Core i7 ) the latter may not be unreasonable as hyper-threading does give a significant extra throughput.
我正在调用函数 parallel::parRapply
来进行计算。 parRapply
通过 splitRows
函数将工作分派给工人。但是 splitRows
函数似乎没有任何智能或错误捕获功能。
R> parRapply
function (cl = NULL, x, FUN, ...)
{
cl <- defaultCluster(cl)
do.call(c, clusterApply(cl = cl, x = splitRows(x, length(cl)),
fun = apply, MARGIN = 1L, FUN = FUN, ...), quote = TRUE)
}
<bytecode: 0x00000000380ca530>
<environment: namespace:parallel>
我找不到 splitRows
的源代码,但 parallel::splitIndices
看起来很相似:
R> parallel:::splitIndices
function (nx, ncl)
{
i <- seq_len(nx)
if (ncl == 0L)
list()
else if (ncl == 1L || nx == 1L)
list(i)
else {
fuzz <- min((nx - 1L)/1000, 0.4 * nx/ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length = ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
}
}
<bytecode: 0x00000000380a7828>
<environment: namespace:parallel>
在我的单元测试中,这将执行如下:
# all 8 cores:
nx <- 5; ncl <- 8
i <- seq_len(nx)
fuzz <- min((nx - 1L)/1000, 0.4 * nx / ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length= ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1
[[2]]
integer(0)
[[3]]
[1] 2
[[4]]
integer(0)
[[5]]
[1] 3
[[6]]
[1] 4
[[7]]
integer(0)
[[8]]
[1] 5
其中有 3 个整数 (0),这会导致调用堆栈进一步失败。
# 3 cores (just showing the return):
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1 2
[[2]]
[1] 3
[[3]]
[1] 4 5
如果有人可以在下面的评论中为 splitRows
的源代码提供 link,我将很乐意更新此答案。 parallel::clusterApply
和 parallel:::staticClusterApply
的代码很容易找到