parApply() 是否划分矩阵然后分别处理?

Does parApply() divide the matrix and then process each?

假设我有一个 parApply() 调用如下:

cl <- makeCluster("FORK", 5)
parApply(cl = cl, X = my.mat, MARGIN = 1, FUN = myFun)

其中 nrow(my.mat) 很大,但 myFun() 的计算速度非常快。注意cl的核心数是5。我想知道并行化是如何进行的。

Is my.mat divided into 5 submatrices, then each processed by a thread, then combined together after all threads are done? Or is it done by sending elements of my.mat to each thread one by one?

以下是 R 文档中的一些解释:

parLapply, parSapply, and parApply are parallel versions of lapply, sapply and apply. Chunks of computation are statically allocated to nodes using clusterApply. By default, the number of chunks is the same as the number of nodes. parLapplyLB, parSapplyLB are load-balancing versions, intended for use when applying FUN to different elements of X takes quite variable amounts of time, and either the function is deterministic or reproducible results are not required. Chunks of computation are allocated dynamically to nodes using clusterApplyLB.

请注意 R/3.5.0:

中进行了一些更改

From R 3.5.0, the default number of chunks is twice the number of nodes. Before R 3.5.0, the (fixed) number of chunks was the same as the number of nodes. As for clusterApplyLB, with load balancing the node that executes a particular job is non-deterministic and simulations that assign RNG streams to nodes will not be reproducible.

clusterApply calls fun on the first node with arguments x[[1]] and ..., on the second node with x[[2]] and ..., and so on, recycling nodes as needed.

clauterApplyLB 的工作方式略有不同:

clusterApplyLB is a load balancing version of clusterApply. If the length n of x is not greater than the number of nodes p, then a job is sent to n nodes. Otherwise the first p jobs are placed in order on the p nodes. When the first job completes, the next job is placed on the node that has become free; this continues until all jobs are complete. Using clusterApplyLB can result in better cluster utilization than using clusterApply, but increased communication can reduce performance. Furthermore, the node that executes a particular job is non-deterministic. This means that simulations that assign RNG streams to nodes will not be reproducible.

所以当您使用 parApply 时,您的矩阵被分成 5 个块。每个块都由其中一个核心处理。在 par*ApplyLB 函数族的情况下,元素被一个接一个地分配给核心,一旦一个核心完成其任务,另一个核心就会被分配给它。

这是以下代码的输出:

library(parallel)


my.mat <- matrix(c(1:20,rep(0,20)), ncol=2)
head(my.mat)
#      [,1] [,2]
# [1,]    1    0
# [2,]    2    0
# [3,]    3    0
# [4,]    4    0
# [5,]    5    0
# [6,]    6    0

cl <- makeCluster(5, "FORK")
parApply(cl = cl, X = my.mat, MARGIN = 1, FUN = function(x){print(paste("sum= ", sum(x), "  pid=",Sys.getpid()))})
# [1] "sum=  1   pid= 42569" 
# [2] "sum=  2   pid= 42569" 
# [3] "sum=  3   pid= 42569" 
# [4] "sum=  4   pid= 42569" 
# [5] "sum=  5   pid= 42570" 
# [6] "sum=  6   pid= 42570" 
# [7] "sum=  7   pid= 42570" 
# [8] "sum=  8   pid= 42570" 
# [9] "sum=  9   pid= 42571" 
# [10] "sum=  10   pid= 42571"
# [11] "sum=  11   pid= 42571"
# [12] "sum=  12   pid= 42571"
# [13] "sum=  13   pid= 42572"
# [14] "sum=  14   pid= 42572"
# [15] "sum=  15   pid= 42572"
# [16] "sum=  16   pid= 42572"
# [17] "sum=  17   pid= 42573"
# [18] "sum=  18   pid= 42573"
# [19] "sum=  19   pid= 42573"
# [20] "sum=  20   pid= 42573"


stopCluster(cl)

如果我使用块大小为 1 的 parLapplyLB,请注意与以下输出的区别(查看 pid 值的分布方式):

mylist <- 1:20
cl <- makeCluster(5, "FORK")
parLapplyLB(cl = cl, X = mylist,function(x){print(paste("sum= ", sum(x), "  pid=",Sys.getpid()))}, chunk.size = 1)
# [[1]]
# [1] "sum=  1   pid= 64019"
# 
# [[2]]
# [1] "sum=  2   pid= 64020"
# 
# [[3]]
# [1] "sum=  3   pid= 64021"
# 
# [[4]]
# [1] "sum=  4   pid= 64022"
# 
# [[5]]
# [1] "sum=  5   pid= 64023"
# 
# [[6]]
# [1] "sum=  6   pid= 64019"
# 
# [[7]]
# [1] "sum=  7   pid= 64020"
# 
# [[8]]
# [1] "sum=  8   pid= 64019"
# 
# [[9]]
# [1] "sum=  9   pid= 64020"
# 
# [[10]]
# [1] "sum=  10   pid= 64019"
# . . .
stopCluster(cl)