使用 clustermq R 包作为 foreach 的并行后端

Using clustermq R package as a parallel backend for foreach

我已经开始使用 clustermq 包作为 drake 管道的并行后端,并且对我观察到的性能改进印象深刻。我有兴趣评估 clustermq / rzmq 在 drake 之外的设置中的使用,但似乎无法使用 User Guide(在标题为 "As parallel foreach backend" 的小节中)中列出的 foreach 的示例工作。我在这里错过了什么?

在我的 4 核机器上的下面示例中,我希望以下代码在将近 5 秒内达到 运行,但它在接近 20 秒内达到 运行。当我使用类似的代码 运行 一些繁重的处理时,我只观察到一个核心在做重要的工作。

library(foreach)
(n_cores <- parallel::detectCores())
#> [1] 4
clustermq::register_dopar_cmq(n_jobs = n_cores)
system.time(foreach(i = seq_len(n_cores)) %dopar% Sys.sleep(5))
#> Submitting 4 worker jobs (ID: 6856) ...
#>    user  system elapsed 
#>   0.118   0.022  20.187

好消息是,
R 控制台报告工作单元已分派到 4 worker jobs (ID: 6856) ...

要查看 CPU-使用量的计算方式(如 unix.time() 中所示),请对此进行测试(在线 R 代码执行无法加载 library(foreach) library(doParallel) 也无法实时在线显示差异,但 code 应该清楚如何在正确配置的本地主机 R 系统上执行此操作:

factor = function( x ) {
         if (      x < 3 ) return( x )
         else              return( x * factor( x - 1 ) )
}

load   = function( n ) {
         for ( l in 1:n ) {
               base <- factor( 1078 )
               for ( k in 2:1111 ){
                     base <- ( base + factor( k ) ) / factor( k )
               }
         }
}

op <- options( digits.secs = 6 )

Sys.time()
system.time(                                           load( 100 ) )
Sys.time()
system.time( foreach( i = seq_len( n_cores ) ) %dopar% load( 100 ) )

Sys.time()

# ___________________________________________________________________
# tio.run
# ___________________________________________________________________
#
# factor(   78 )                                                     user: 0.016 system: 0.001 elapsed: 0.018
# factor(  178 )                                                     user: 0.016 system: 0.002 elapsed: 0.018 Real: 0.271 User: 0.180 Sys.: 0.059 CPU share: 88.04 %
# factor( 1078 )                                                     user: 0.017 system: 0.005 elapsed: 0.021 Real: 0.236 User: 0.188 Sys.: 0.047 CPU share: 99.94 %
# factor( 2178 )                                  Timing stopped at: user: 0.017 system: 0.005 elapsed: 0.023 Error: C stack usage  7972624 is too close to the limit : Execution halted
#
# load(   4 )                                                        user: 2.230 system: 0.005 elapsed: 2.253 Real: 2.483 User: 2.398 Sys.: 0.048 CPU share: 98.25 %
# load(   8 )                                                        user: 4.435 system: 0.013 elapsed: 4.481 Real: 4.753 User: 4.614 Sys.: 0.067 CPU share: 98.49 %
# load(  16 )                                                        user: 8.620 system: 0.009 elapsed: 8.693 Real: 8.932 User: 8.775 Sys.: 0.059 CPU share: 98.91 %
# load(  32 )                                                        user:20.522 system: 0.018 elapsed:20.892 Real:21.167 User:20.699 Sys.: 0.077 CPU share: 98.15 %
# load(  64 )                                                        user:35.212 system: 0.007 elapsed:35.611 Real:35.861 User:35.374 Sys.: 0.053 CPU share: 98.78 %
# load( 100 )                                                        user:54.721 system: 0.009 elapsed:55.128 Real:55.377 User:54.875 Sys.: 0.065 CPU share: 99.21 %
# load( 104 )                                                        user:57.087 system: 0.012 elapsed:57.532 Real:57.743 User:57.245 Sys.: 0.060 CPU share: 99.24 %
#
# ___________________________________________________________________
# https://www.jdoodle.com/execute-r-online/
# ___________________________________________________________________
#
# factor(   78 ) CPU Time: 0.27 sec(s), Mem: 52532 kB(s)             user: 0.028 system: 0.000 elapsed: 0.030
# factor(  178 ) CPU Time: 0.28 sec(s), Mem: 53212 kB(s)             user: 0.024 system: 0.004 elapsed: 0.028
# factor( 1078 ) CPU Time: 0.27 sec(s), Mem: 56884 kB(s)             user: 0.024 system: 0.008 elapsed: 0.037
# factor( 2178 ) CPU Time: 0.28 sec(s), Mem: 60768 kB(s) stopped at: user: 0.028 system: 0.004 elapsed: 0.032 Error: C stack usage  7971588 is too close to the limit : Execution halted
#
#
# load(  2 ) executed in  4.025 sec(s)                               user: 2.584 system: 0.008 elapsed: 2.630
# load(  4 ) executed in  6.005 sec(s)                               user: 4.704 system: 0.016 elapsed: 4.791
# load(  8 ) executed in 11.036 sec(s)                               user: 9.372 system: 0.028 elapsed: 9.572
# load( 16 ) executed in 20.085 sec(s)                               user:18.388 system: 0.068 elapsed:18.789
# load( 32 ) executed in 39.093 sec(s)                               user:37.664 system: 0.060 elapsed:37.974


# library(foreach)
# (n_cores <- parallel::detectCores())
## > [1] 4
# clustermq::register_dopar_cmq( n_jobs = n_cores ) ############################ REGISTER a parallel-backend
# system.time( foreach( i = seq_len( n_cores ) ) %dopar% load( 200 ) )
## > Submitting 4 worker jobs (ID: 6856) ...
## >    user  system elapsed 
## >   0.118   0.022  20.187

clustermq 包的作者对我发布的关于此的 GitHub 问题做出了友好的回应。简而言之,可以设置一个选项 clustermq.scheduler 来指定 clustermq 使用的调度类型。对于我的情况,由于该选项未设置,clustermq 默认为本地(即顺序)调度。要在本地计算机上执行并行处理,可以将 clustermq.scheduler 选项设置为 "multicore"。所以总的来说,结果如下。

library(foreach)
(n_cores <- parallel::detectCores())
#> [1] 4
clustermq::register_dopar_cmq(n_jobs = n_cores)
options(clustermq.scheduler = "multicore")
system.time(foreach(i = seq_len(n_cores)) %dopar% Sys.sleep(5))
#> Submitting 4 worker jobs (ID: 7206) ...
#> Running 4 calculations (1 objs/0 Mb common; 1 calls/chunk) ...
#> Master: [5.6s 3.6% CPU]; Worker: [avg 1.3% CPU, max 2475916.0 Mb]
#>    user  system elapsed 
#>   0.188   0.065   5.693