对于 vs lapply 和由于在 R 中调用函数而增加的时间
for vs lapply and time increase due to calling a function in R
我测试了以下代码(在 300K 行 DF 上)以确定哪种是 R 中并行化的最快方法(for 循环与 lapply)。
Q1. 这是否总是正确的(基于经过的时间比较)并行 lapply 比并行 for 循环更快?根据不同的在线帖子,我看到人们说 "Duh! Lapply is always faster" 或 "Depending on your implementation for loop can be faster"。
Q2.更让人吃惊的是运行类似的代码通过调用一个函数(为了让代码看起来更简洁)的速度要慢得多。我是否正确地对它们进行了基准测试?
我看到 30K 行有类似的趋势。根据答案,我将查看并行化是否随着内核的增加而扩展得很好。
谢谢。
#Results:
[1] 300000 3
[1] "For loop all conditions"
user system elapsed
1040.232 8.767 1048.897
[1] "Parallel For loop all conditions"
user system elapsed
266.861 8.462 276.064
[1] "Lapply all conditions"
user system elapsed
66.364 0.014 66.369
[1] "ParLapply all conditions"
user system elapsed
0.413 0.113 25.890
[1] "Lapply all conditions call function"
user system elapsed
5293.981 223.524 5517.128
[1] "ParLapply all conditions call function"
user system elapsed
0.492 0.082 1949.433
[1] "For loop all conditions call function"
user system elapsed
10506.028 82.372 10587.585
[1] "Parallel For loop all conditions call function"
user system elapsed
585.387 29.322 2246.441
#Code:
d1 = c(1,2,-3)
d2 = c(1,-2,-2)
d3 = c(1,-2,-4)
d = data.frame(d1,d2,d3)
# making a big data frame for testing
s_df = d[rep(seq_len(nrow(d)), each=100000),]
correlThreshold = 0
total_numb_input_files = 3
rows_passing_consistency = c()
print("For loop all conditions")
system.time(
for(idx in 1:nrow(s_df)){
dfx = as.vector(unlist(s_df[idx, ,drop=T]))
rname = rownames(s_df)[idx]
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
rows_passing_consistency = append(rows_passing_consistency, res)
}
)
print("Parallel For loop all conditions")
library(doParallel)
cl<-makeCluster(4, type="FORK")
registerDoParallel(cl)
system.time(
foreach(idx = 1:nrow(s_df), .combine = c) %dopar% {
dfx = as.vector(unlist(s_df[idx, ,drop=T]))
rname = rownames(s_df)[idx]
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
res
}
)
stopCluster(cl)
print("Lapply all conditions")
system.time(
lapply(1:nrow(s_df) ,
function(idx, s_df){
dfx = as.vector(unlist(s_df[idx, ,drop=T]))
rname = rownames(s_df)[idx]
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
res
}
, s_df
)
)
print("ParLapply all conditions")
library(doParallel)
cl<-makeCluster(4, type="FORK")
#registerDoParallel(cl)
system.time(
parLapply(cl, 1:nrow(s_df) ,
function(idx, s_df){
dfx = as.vector(unlist(s_df[idx, ,drop=T]))
rname = rownames(s_df)[idx]
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
res
}
, s_df
)
)
stopCluster(cl)
calc_consistency = function(rname, s_df){
dfx = as.vector(unlist(s_df[rname, ,drop=T]))
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
return(res)
}
print("Lapply all conditions call function")
system.time(lapply(rownames(s_df), calc_consistency, s_df))
print("ParLapply all conditions call function")
library(doParallel)
cl<-makeCluster(4, type="FORK")
#registerDoParallel(cl)
system.time(parLapply(cl, rownames(s_df), calc_consistency, s_df))
stopCluster(cl)
print("For loop all conditions call function")
system.time(
for(rname in rownames(s_df)){
rows_passing_consistency = append(rows_passing_consistency, calc_consistency(rname, s_df))
}
)
print("Parallel For loop all conditions call function")
library(doParallel)
cl<-makeCluster(4, type="FORK")
registerDoParallel(cl)
system.time(
foreach(rname=rownames(s_df), .combine = c) %dopar% {
calc_consistency(rname, s_df)
}
)
stopCluster(cl)
所以事实证明,速度的主要差异是由于将 "row indexes" 与 "rownames" 传递给应用函数造成的。我尝试将 (l)apply 与内联和单独的函数调用一起使用,有和没有并行化。使用 apply 进行内联调用与函数调用没有太大区别。并行化也同样有效。主要的时间延迟是由于传递索引与行名造成的,尽管我不确定为什么会发生这种情况。
我测试了以下代码(在 300K 行 DF 上)以确定哪种是 R 中并行化的最快方法(for 循环与 lapply)。
Q1. 这是否总是正确的(基于经过的时间比较)并行 lapply 比并行 for 循环更快?根据不同的在线帖子,我看到人们说 "Duh! Lapply is always faster" 或 "Depending on your implementation for loop can be faster"。
Q2.更让人吃惊的是运行类似的代码通过调用一个函数(为了让代码看起来更简洁)的速度要慢得多。我是否正确地对它们进行了基准测试?
我看到 30K 行有类似的趋势。根据答案,我将查看并行化是否随着内核的增加而扩展得很好。
谢谢。
#Results:
[1] 300000 3
[1] "For loop all conditions"
user system elapsed
1040.232 8.767 1048.897
[1] "Parallel For loop all conditions"
user system elapsed
266.861 8.462 276.064
[1] "Lapply all conditions"
user system elapsed
66.364 0.014 66.369
[1] "ParLapply all conditions"
user system elapsed
0.413 0.113 25.890
[1] "Lapply all conditions call function"
user system elapsed
5293.981 223.524 5517.128
[1] "ParLapply all conditions call function"
user system elapsed
0.492 0.082 1949.433
[1] "For loop all conditions call function"
user system elapsed
10506.028 82.372 10587.585
[1] "Parallel For loop all conditions call function"
user system elapsed
585.387 29.322 2246.441
#Code:
d1 = c(1,2,-3)
d2 = c(1,-2,-2)
d3 = c(1,-2,-4)
d = data.frame(d1,d2,d3)
# making a big data frame for testing
s_df = d[rep(seq_len(nrow(d)), each=100000),]
correlThreshold = 0
total_numb_input_files = 3
rows_passing_consistency = c()
print("For loop all conditions")
system.time(
for(idx in 1:nrow(s_df)){
dfx = as.vector(unlist(s_df[idx, ,drop=T]))
rname = rownames(s_df)[idx]
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
rows_passing_consistency = append(rows_passing_consistency, res)
}
)
print("Parallel For loop all conditions")
library(doParallel)
cl<-makeCluster(4, type="FORK")
registerDoParallel(cl)
system.time(
foreach(idx = 1:nrow(s_df), .combine = c) %dopar% {
dfx = as.vector(unlist(s_df[idx, ,drop=T]))
rname = rownames(s_df)[idx]
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
res
}
)
stopCluster(cl)
print("Lapply all conditions")
system.time(
lapply(1:nrow(s_df) ,
function(idx, s_df){
dfx = as.vector(unlist(s_df[idx, ,drop=T]))
rname = rownames(s_df)[idx]
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
res
}
, s_df
)
)
print("ParLapply all conditions")
library(doParallel)
cl<-makeCluster(4, type="FORK")
#registerDoParallel(cl)
system.time(
parLapply(cl, 1:nrow(s_df) ,
function(idx, s_df){
dfx = as.vector(unlist(s_df[idx, ,drop=T]))
rname = rownames(s_df)[idx]
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
res
}
, s_df
)
)
stopCluster(cl)
calc_consistency = function(rname, s_df){
dfx = as.vector(unlist(s_df[rname, ,drop=T]))
res = NULL
#print(dfx)
pos = sum(dfx > correlThreshold)
neg = sum(dfx < correlThreshold)
if((!is.na(pos)) && pos == (total_numb_input_files)){
res = rname
}
if((!is.na(neg)) && neg == (total_numb_input_files)){
res = rname
}
return(res)
}
print("Lapply all conditions call function")
system.time(lapply(rownames(s_df), calc_consistency, s_df))
print("ParLapply all conditions call function")
library(doParallel)
cl<-makeCluster(4, type="FORK")
#registerDoParallel(cl)
system.time(parLapply(cl, rownames(s_df), calc_consistency, s_df))
stopCluster(cl)
print("For loop all conditions call function")
system.time(
for(rname in rownames(s_df)){
rows_passing_consistency = append(rows_passing_consistency, calc_consistency(rname, s_df))
}
)
print("Parallel For loop all conditions call function")
library(doParallel)
cl<-makeCluster(4, type="FORK")
registerDoParallel(cl)
system.time(
foreach(rname=rownames(s_df), .combine = c) %dopar% {
calc_consistency(rname, s_df)
}
)
stopCluster(cl)
所以事实证明,速度的主要差异是由于将 "row indexes" 与 "rownames" 传递给应用函数造成的。我尝试将 (l)apply 与内联和单独的函数调用一起使用,有和没有并行化。使用 apply 进行内联调用与函数调用没有太大区别。并行化也同样有效。主要的时间延迟是由于传递索引与行名造成的,尽管我不确定为什么会发生这种情况。