使用 clustermq R 包作为 foreach 的并行后端
Using clustermq R package as a parallel backend for foreach
我已经开始使用 clustermq 包作为 drake 管道的并行后端,并且对我观察到的性能改进印象深刻。我有兴趣评估 clustermq / rzmq 在 drake 之外的设置中的使用,但似乎无法使用 User Guide(在标题为 "As parallel foreach backend" 的小节中)中列出的 foreach 的示例工作。我在这里错过了什么?
在我的 4 核机器上的下面示例中,我希望以下代码在将近 5 秒内达到 运行,但它在接近 20 秒内达到 运行。当我使用类似的代码 运行 一些繁重的处理时,我只观察到一个核心在做重要的工作。
library(foreach)
(n_cores <- parallel::detectCores())
#> [1] 4
clustermq::register_dopar_cmq(n_jobs = n_cores)
system.time(foreach(i = seq_len(n_cores)) %dopar% Sys.sleep(5))
#> Submitting 4 worker jobs (ID: 6856) ...
#> user system elapsed
#> 0.118 0.022 20.187
好消息是,
R 控制台报告工作单元已分派到 4 worker jobs (ID: 6856) ...
要查看 CPU-使用量的计算方式(如 unix.time()
中所示),请对此进行测试(在线 R 代码执行无法加载 library(foreach)
library(doParallel)
也无法实时在线显示差异,但 code 应该清楚如何在正确配置的本地主机 R 系统上执行此操作:
factor = function( x ) {
if ( x < 3 ) return( x )
else return( x * factor( x - 1 ) )
}
load = function( n ) {
for ( l in 1:n ) {
base <- factor( 1078 )
for ( k in 2:1111 ){
base <- ( base + factor( k ) ) / factor( k )
}
}
}
op <- options( digits.secs = 6 )
Sys.time()
system.time( load( 100 ) )
Sys.time()
system.time( foreach( i = seq_len( n_cores ) ) %dopar% load( 100 ) )
Sys.time()
# ___________________________________________________________________
# tio.run
# ___________________________________________________________________
#
# factor( 78 ) user: 0.016 system: 0.001 elapsed: 0.018
# factor( 178 ) user: 0.016 system: 0.002 elapsed: 0.018 Real: 0.271 User: 0.180 Sys.: 0.059 CPU share: 88.04 %
# factor( 1078 ) user: 0.017 system: 0.005 elapsed: 0.021 Real: 0.236 User: 0.188 Sys.: 0.047 CPU share: 99.94 %
# factor( 2178 ) Timing stopped at: user: 0.017 system: 0.005 elapsed: 0.023 Error: C stack usage 7972624 is too close to the limit : Execution halted
#
# load( 4 ) user: 2.230 system: 0.005 elapsed: 2.253 Real: 2.483 User: 2.398 Sys.: 0.048 CPU share: 98.25 %
# load( 8 ) user: 4.435 system: 0.013 elapsed: 4.481 Real: 4.753 User: 4.614 Sys.: 0.067 CPU share: 98.49 %
# load( 16 ) user: 8.620 system: 0.009 elapsed: 8.693 Real: 8.932 User: 8.775 Sys.: 0.059 CPU share: 98.91 %
# load( 32 ) user:20.522 system: 0.018 elapsed:20.892 Real:21.167 User:20.699 Sys.: 0.077 CPU share: 98.15 %
# load( 64 ) user:35.212 system: 0.007 elapsed:35.611 Real:35.861 User:35.374 Sys.: 0.053 CPU share: 98.78 %
# load( 100 ) user:54.721 system: 0.009 elapsed:55.128 Real:55.377 User:54.875 Sys.: 0.065 CPU share: 99.21 %
# load( 104 ) user:57.087 system: 0.012 elapsed:57.532 Real:57.743 User:57.245 Sys.: 0.060 CPU share: 99.24 %
#
# ___________________________________________________________________
# https://www.jdoodle.com/execute-r-online/
# ___________________________________________________________________
#
# factor( 78 ) CPU Time: 0.27 sec(s), Mem: 52532 kB(s) user: 0.028 system: 0.000 elapsed: 0.030
# factor( 178 ) CPU Time: 0.28 sec(s), Mem: 53212 kB(s) user: 0.024 system: 0.004 elapsed: 0.028
# factor( 1078 ) CPU Time: 0.27 sec(s), Mem: 56884 kB(s) user: 0.024 system: 0.008 elapsed: 0.037
# factor( 2178 ) CPU Time: 0.28 sec(s), Mem: 60768 kB(s) stopped at: user: 0.028 system: 0.004 elapsed: 0.032 Error: C stack usage 7971588 is too close to the limit : Execution halted
#
#
# load( 2 ) executed in 4.025 sec(s) user: 2.584 system: 0.008 elapsed: 2.630
# load( 4 ) executed in 6.005 sec(s) user: 4.704 system: 0.016 elapsed: 4.791
# load( 8 ) executed in 11.036 sec(s) user: 9.372 system: 0.028 elapsed: 9.572
# load( 16 ) executed in 20.085 sec(s) user:18.388 system: 0.068 elapsed:18.789
# load( 32 ) executed in 39.093 sec(s) user:37.664 system: 0.060 elapsed:37.974
# library(foreach)
# (n_cores <- parallel::detectCores())
## > [1] 4
# clustermq::register_dopar_cmq( n_jobs = n_cores ) ############################ REGISTER a parallel-backend
# system.time( foreach( i = seq_len( n_cores ) ) %dopar% load( 200 ) )
## > Submitting 4 worker jobs (ID: 6856) ...
## > user system elapsed
## > 0.118 0.022 20.187
clustermq 包的作者对我发布的关于此的 GitHub 问题做出了友好的回应。简而言之,可以设置一个选项 clustermq.scheduler
来指定 clustermq 使用的调度类型。对于我的情况,由于该选项未设置,clustermq 默认为本地(即顺序)调度。要在本地计算机上执行并行处理,可以将 clustermq.scheduler
选项设置为 "multicore"
。所以总的来说,结果如下。
library(foreach)
(n_cores <- parallel::detectCores())
#> [1] 4
clustermq::register_dopar_cmq(n_jobs = n_cores)
options(clustermq.scheduler = "multicore")
system.time(foreach(i = seq_len(n_cores)) %dopar% Sys.sleep(5))
#> Submitting 4 worker jobs (ID: 7206) ...
#> Running 4 calculations (1 objs/0 Mb common; 1 calls/chunk) ...
#> Master: [5.6s 3.6% CPU]; Worker: [avg 1.3% CPU, max 2475916.0 Mb]
#> user system elapsed
#> 0.188 0.065 5.693
我已经开始使用 clustermq 包作为 drake 管道的并行后端,并且对我观察到的性能改进印象深刻。我有兴趣评估 clustermq / rzmq 在 drake 之外的设置中的使用,但似乎无法使用 User Guide(在标题为 "As parallel foreach backend" 的小节中)中列出的 foreach 的示例工作。我在这里错过了什么?
在我的 4 核机器上的下面示例中,我希望以下代码在将近 5 秒内达到 运行,但它在接近 20 秒内达到 运行。当我使用类似的代码 运行 一些繁重的处理时,我只观察到一个核心在做重要的工作。
library(foreach)
(n_cores <- parallel::detectCores())
#> [1] 4
clustermq::register_dopar_cmq(n_jobs = n_cores)
system.time(foreach(i = seq_len(n_cores)) %dopar% Sys.sleep(5))
#> Submitting 4 worker jobs (ID: 6856) ...
#> user system elapsed
#> 0.118 0.022 20.187
好消息是,
R 控制台报告工作单元已分派到 4 worker jobs (ID: 6856) ...
要查看 CPU-使用量的计算方式(如 unix.time()
中所示),请对此进行测试(在线 R 代码执行无法加载 library(foreach)
library(doParallel)
也无法实时在线显示差异,但 code 应该清楚如何在正确配置的本地主机 R 系统上执行此操作:
factor = function( x ) {
if ( x < 3 ) return( x )
else return( x * factor( x - 1 ) )
}
load = function( n ) {
for ( l in 1:n ) {
base <- factor( 1078 )
for ( k in 2:1111 ){
base <- ( base + factor( k ) ) / factor( k )
}
}
}
op <- options( digits.secs = 6 )
Sys.time()
system.time( load( 100 ) )
Sys.time()
system.time( foreach( i = seq_len( n_cores ) ) %dopar% load( 100 ) )
Sys.time()
# ___________________________________________________________________
# tio.run
# ___________________________________________________________________
#
# factor( 78 ) user: 0.016 system: 0.001 elapsed: 0.018
# factor( 178 ) user: 0.016 system: 0.002 elapsed: 0.018 Real: 0.271 User: 0.180 Sys.: 0.059 CPU share: 88.04 %
# factor( 1078 ) user: 0.017 system: 0.005 elapsed: 0.021 Real: 0.236 User: 0.188 Sys.: 0.047 CPU share: 99.94 %
# factor( 2178 ) Timing stopped at: user: 0.017 system: 0.005 elapsed: 0.023 Error: C stack usage 7972624 is too close to the limit : Execution halted
#
# load( 4 ) user: 2.230 system: 0.005 elapsed: 2.253 Real: 2.483 User: 2.398 Sys.: 0.048 CPU share: 98.25 %
# load( 8 ) user: 4.435 system: 0.013 elapsed: 4.481 Real: 4.753 User: 4.614 Sys.: 0.067 CPU share: 98.49 %
# load( 16 ) user: 8.620 system: 0.009 elapsed: 8.693 Real: 8.932 User: 8.775 Sys.: 0.059 CPU share: 98.91 %
# load( 32 ) user:20.522 system: 0.018 elapsed:20.892 Real:21.167 User:20.699 Sys.: 0.077 CPU share: 98.15 %
# load( 64 ) user:35.212 system: 0.007 elapsed:35.611 Real:35.861 User:35.374 Sys.: 0.053 CPU share: 98.78 %
# load( 100 ) user:54.721 system: 0.009 elapsed:55.128 Real:55.377 User:54.875 Sys.: 0.065 CPU share: 99.21 %
# load( 104 ) user:57.087 system: 0.012 elapsed:57.532 Real:57.743 User:57.245 Sys.: 0.060 CPU share: 99.24 %
#
# ___________________________________________________________________
# https://www.jdoodle.com/execute-r-online/
# ___________________________________________________________________
#
# factor( 78 ) CPU Time: 0.27 sec(s), Mem: 52532 kB(s) user: 0.028 system: 0.000 elapsed: 0.030
# factor( 178 ) CPU Time: 0.28 sec(s), Mem: 53212 kB(s) user: 0.024 system: 0.004 elapsed: 0.028
# factor( 1078 ) CPU Time: 0.27 sec(s), Mem: 56884 kB(s) user: 0.024 system: 0.008 elapsed: 0.037
# factor( 2178 ) CPU Time: 0.28 sec(s), Mem: 60768 kB(s) stopped at: user: 0.028 system: 0.004 elapsed: 0.032 Error: C stack usage 7971588 is too close to the limit : Execution halted
#
#
# load( 2 ) executed in 4.025 sec(s) user: 2.584 system: 0.008 elapsed: 2.630
# load( 4 ) executed in 6.005 sec(s) user: 4.704 system: 0.016 elapsed: 4.791
# load( 8 ) executed in 11.036 sec(s) user: 9.372 system: 0.028 elapsed: 9.572
# load( 16 ) executed in 20.085 sec(s) user:18.388 system: 0.068 elapsed:18.789
# load( 32 ) executed in 39.093 sec(s) user:37.664 system: 0.060 elapsed:37.974
# library(foreach)
# (n_cores <- parallel::detectCores())
## > [1] 4
# clustermq::register_dopar_cmq( n_jobs = n_cores ) ############################ REGISTER a parallel-backend
# system.time( foreach( i = seq_len( n_cores ) ) %dopar% load( 200 ) )
## > Submitting 4 worker jobs (ID: 6856) ...
## > user system elapsed
## > 0.118 0.022 20.187
clustermq 包的作者对我发布的关于此的 GitHub 问题做出了友好的回应。简而言之,可以设置一个选项 clustermq.scheduler
来指定 clustermq 使用的调度类型。对于我的情况,由于该选项未设置,clustermq 默认为本地(即顺序)调度。要在本地计算机上执行并行处理,可以将 clustermq.scheduler
选项设置为 "multicore"
。所以总的来说,结果如下。
library(foreach)
(n_cores <- parallel::detectCores())
#> [1] 4
clustermq::register_dopar_cmq(n_jobs = n_cores)
options(clustermq.scheduler = "multicore")
system.time(foreach(i = seq_len(n_cores)) %dopar% Sys.sleep(5))
#> Submitting 4 worker jobs (ID: 7206) ...
#> Running 4 calculations (1 objs/0 Mb common; 1 calls/chunk) ...
#> Master: [5.6s 3.6% CPU]; Worker: [avg 1.3% CPU, max 2475916.0 Mb]
#> user system elapsed
#> 0.188 0.065 5.693