R parallel - parRapply 无法正常工作

R parallel - parRapply not working properly

我正在对开发中的包进行一些单元测试。其中一项测试失败。具体来说,我有代码的并行版本和非并行版本。非并行版本完美运行。并行版本未通过单元测试,并出现看似无意义的错误。

## load my development package.
library(devtools) # for install_github
install_github("alexwhitworth/imputation")

## do some setup:
library(imputation)
library(kernlab)
library(parallel)


x1 <- matrix(rnorm(200), 20, 10)
x1[x1 > 1.25] <- NA
x3 <- create_canopies(x1, n_canopies= 5, q= 2)
prelim <- imputation:::impute_prelim(x3[[1]], parallel= TRUE, leave_cores= 1)

opt_h <- (4 * sd(x3[[1]][, -ncol(x3[[1]])], na.rm=T)^5 / (3 * nrow(x3[[1]])))^(1/5)
kern <- rbfdot(opt_h)


## write 2 identical functions:
## one in parallel
## one not in parallel

foo_parallel <- function(x_missing, x_complete, k, q, leave_cores) {
  cl <- makeCluster(detectCores() - leave_cores)
  x_missing_imputed <- parRapply(cl= cl, x_missing, function(i, x_complete) {
    rowID = as.numeric(i[1])
    i_original = unlist(i[-1])
    x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
    missing_cols <- which(is.na(x_complete[x_comp_rowID,]))

    # calculate distances
    distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ], 
                                                    x_complete[-x_comp_rowID,]), ref= 1L,  q= q)
    return(distances)
  }, x_complete= x_complete)
  stopCluster(cl)
  return(x_missing_imputed)
}

foo_nonparallel <- function(x_missing, x_complete, k, q) {
  x_missing_imputed <- t(apply(x_missing, 1, function(i, x_complete) {
    rowID = as.numeric(i[1])
    i_original = unlist(i[-1])
    x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
    missing_cols <- which(is.na(x_complete[x_comp_rowID,]))

    # calculate distances
    distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ], 
                                                    x_complete[-x_comp_rowID,]), ref= 1L,  q= q)
    return(distances)
  }, x_complete= x_complete))
  return(x_missing_imputed)
}

## test them
foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 1) # fails
foo_nonparallel(prelim$x_missing, x3[[1]],k=3,q=2) # works

Error in checkForRemoteErrors(val) : 2 nodes produced errors; first error: ref must be an integer in {1, nrow(x)}.

如您所见,ref 明确定义为 ref= 1L 位于 1, nrow(x).

library(parallel) 的交互发生了什么导致了这个错误?

编辑 - 我在 windows 机器上:

R> sessionInfo()
R version 3.2.2 (2015-08-14)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 7 x64 (build 7601) Service Pack 1

我已经弄清楚是什么导致了这个问题。在我看来,这似乎是一个 library(parallel) 错误/边缘案例,特定于应用函数的并行版本(在本例中为 parRapply)。也许年长和更聪明的人可以解释为什么 library(parallel) 对于这种边缘情况没有问题。

问题似乎与任务数量与可用工作人员数量有关。在我的机器上,我有一个 8 核处理器。在这种情况下,有 5 个任务(prelim$x_missing 的每一行一个)。

Granted, in typical use, I wouldn't be parallelizing work for 5 rows. This is just a unit test.

R> prelim$x_missing
              X1         X2         X3         X4         X5          X6         X7         X8          X9        X10 d_factor
6   6  0.2604170 -0.5966874         NA         NA -0.3013053  0.24313272  0.2836760  0.3977164 -0.60711109 -0.2929253        1
7   7 -0.8540576  0.1409047         NA  0.4801685 -0.9324517 -0.06487733 -0.2220201         NA  1.19077335 -0.3702607        2
8   8  0.5118453 -0.8750674         NA  0.1787238  0.6897163  0.20695122         NA -0.3488021  0.84200408 -0.4791230        1
12 12  0.3695746 -0.4919277 -1.2509180  1.1642152         NA  0.04018417         NA         NA -0.53436589 -1.5400345        2
15 15         NA -0.3608242 -0.6761515 -0.5366562  0.1763501          NA         NA  0.4967595  0.02635203 -0.6015536        1

请注意,我正在通过 cl <- parallel::makeCluster(detectCores() - leave_cores) 创建集群,其中 detectCores() 将为我当前的机器 return 8。该函数调用接受一个参数,表示要保持打开状态的核心数 leave_cores。当我创建一个比用例中的行多 cores/nodes 的集群时,该函数失败。当我创建一个 <= 行数的集群时,该函数有效:

 # works : detectCores() == 8, 8 - 3 == 5 (number of rows / processes)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 3)
 [1] 1.0216313 0.7355635 0.9201501 0.6906554 0.6613939 0.3628872 0.9995641 0.8571252 0.9271800 0.9201501 0.9238215 0.9798824 0.9059506
[14] 0.6891484 1.0158223 0.5442953 0.6906554 0.9238215 0.8607280 0.5897955 1.1084943 0.8518322 0.9227102 0.6613939 0.9798824 0.8607280
[27] 0.9518105 0.9792209 1.1968528 0.4447104 0.3628872 0.9059506 0.5897955 0.9518105 1.1249624

# fails : 8-2 = 6; 6 > nrow(prelim$x_missing)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 2) 
Error in checkForRemoteErrors(val) : 
  one node produced an error: ref must be an integer in {1, nrow(x)}. 

tl,博士

rparallel vignette中所述,detectCores用于简单地检测核心,它非常合理地不尝试对工人进行任何智能分配任务。

function detectCores() tries to determine the number of CPU cores in the machine on which R is running: it has ways to do so on all known current R platforms. What exactly it measures is OS-specific: we try where possible to report the number of physical cores available. On Windows the default is to report the number of logical CPUs. On modern hardware (e.g. Intel Core i7 ) the latter may not be unreasonable as hyper-threading does give a significant extra throughput.

我正在调用函数 parallel::parRapply 来进行计算。 parRapply 通过 splitRows 函数将工作分派给工人。但是 splitRows 函数似乎没有任何智能或错误捕获功能。

R> parRapply
function (cl = NULL, x, FUN, ...) 
{
    cl <- defaultCluster(cl)
    do.call(c, clusterApply(cl = cl, x = splitRows(x, length(cl)), 
        fun = apply, MARGIN = 1L, FUN = FUN, ...), quote = TRUE)
}
<bytecode: 0x00000000380ca530>
<environment: namespace:parallel>

我找不到 splitRows 的源代码,但 parallel::splitIndices 看起来很相似:

R> parallel:::splitIndices
function (nx, ncl) 
{
    i <- seq_len(nx)
    if (ncl == 0L) 
        list()
    else if (ncl == 1L || nx == 1L) 
        list(i)
    else {
        fuzz <- min((nx - 1L)/1000, 0.4 * nx/ncl)
        breaks <- seq(1 - fuzz, nx + fuzz, length = ncl + 1L)
        structure(split(i, cut(i, breaks)), names = NULL)
    }
}
<bytecode: 0x00000000380a7828>
<environment: namespace:parallel>

在我的单元测试中,这将执行如下:

# all 8 cores:
nx <- 5; ncl <- 8
i <- seq_len(nx)
fuzz <- min((nx - 1L)/1000, 0.4 * nx / ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length= ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1

[[2]]
integer(0)

[[3]]
[1] 2

[[4]]
integer(0)

[[5]]
[1] 3

[[6]]
[1] 4

[[7]]
integer(0)

[[8]]
[1] 5

其中有 3 个整数 (0),这会导致调用堆栈进一步失败。

# 3 cores (just showing the return):
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1 2

[[2]]
[1] 3

[[3]]
[1] 4 5

如果有人可以在下面的评论中为 splitRows 的源代码提供 link,我将很乐意更新此答案。 parallel::clusterApplyparallel:::staticClusterApply 的代码很容易找到