对于 vs lapply 和由于在 R 中调用函数而增加的时间

for vs lapply and time increase due to calling a function in R

我测试了以下代码(在 300K 行 DF 上)以确定哪种是 R 中并行化的最快方法(for 循环与 lapply)。

Q1. 这是否总是正确的(基于经过的时间比较)并行 lapply 比并行 for 循环更快?根据不同的在线帖子,我看到人们说 "Duh! Lapply is always faster" 或 "Depending on your implementation for loop can be faster"。

Q2.更让人吃惊的是运行类似的代码通过调用一个函数(为了让代码看起来更简洁)的速度要慢得多。我是否正确地对它们进行了基准测试?

我看到 30K 行有类似的趋势。根据答案,我将查看并行化是否随着内核的增加而扩展得很好。

谢谢。

#Results:
[1] 300000      3
[1] "For loop all conditions"
    user   system  elapsed 
1040.232    8.767 1048.897 
[1] "Parallel For loop all conditions"
   user  system elapsed 
266.861   8.462 276.064 
[1] "Lapply all conditions"
   user  system elapsed 
 66.364   0.014  66.369 
[1] "ParLapply all conditions"
   user  system elapsed 
  0.413   0.113  25.890 
[1] "Lapply all conditions call function"
    user   system  elapsed 
5293.981  223.524 5517.128 
[1] "ParLapply all conditions call function"
    user   system  elapsed 
   0.492    0.082 1949.433 
[1] "For loop all conditions call function"
     user    system   elapsed 
10506.028    82.372 10587.585 
[1] "Parallel For loop all conditions call function"
    user   system  elapsed 
 585.387   29.322 2246.441 

#Code:  
d1 = c(1,2,-3)
d2 = c(1,-2,-2)
d3 = c(1,-2,-4)
d = data.frame(d1,d2,d3)
# making a big data frame for testing
s_df = d[rep(seq_len(nrow(d)), each=100000),]


correlThreshold = 0
total_numb_input_files = 3
rows_passing_consistency = c()


print("For loop all conditions")
system.time(
        for(idx in 1:nrow(s_df)){
            dfx = as.vector(unlist(s_df[idx, ,drop=T]))
            rname = rownames(s_df)[idx]
            res = NULL
            #print(dfx)
            pos = sum(dfx > correlThreshold)
            neg = sum(dfx < correlThreshold)

            if((!is.na(pos)) && pos == (total_numb_input_files)){
                res = rname
            }

            if((!is.na(neg)) && neg == (total_numb_input_files)){
                res = rname
            }
            rows_passing_consistency = append(rows_passing_consistency, res)
        }
)



print("Parallel For loop all conditions")
library(doParallel)
cl<-makeCluster(4, type="FORK")
registerDoParallel(cl)
system.time(
        foreach(idx = 1:nrow(s_df), .combine = c) %dopar% {
            dfx = as.vector(unlist(s_df[idx, ,drop=T]))
            rname = rownames(s_df)[idx]
            res = NULL
            #print(dfx)
            pos = sum(dfx > correlThreshold)
            neg = sum(dfx < correlThreshold)

            if((!is.na(pos)) && pos == (total_numb_input_files)){
                res = rname
            }
            if((!is.na(neg)) && neg == (total_numb_input_files)){
                 res = rname
            }
            res
        }
)
stopCluster(cl)



print("Lapply all conditions")
system.time(
  lapply(1:nrow(s_df) , 
        function(idx, s_df){
            dfx = as.vector(unlist(s_df[idx, ,drop=T]))
            rname = rownames(s_df)[idx]
            res = NULL
            #print(dfx)
            pos = sum(dfx > correlThreshold)
            neg = sum(dfx < correlThreshold)

            if((!is.na(pos)) && pos == (total_numb_input_files)){
                res = rname
            }

            if((!is.na(neg)) && neg == (total_numb_input_files)){
                res = rname
            }
            res         
        }
    , s_df
  )
)



print("ParLapply all conditions")
library(doParallel)
cl<-makeCluster(4, type="FORK")
#registerDoParallel(cl)
system.time(
  parLapply(cl, 1:nrow(s_df) , 
        function(idx, s_df){
            dfx = as.vector(unlist(s_df[idx, ,drop=T]))
            rname = rownames(s_df)[idx]
            res = NULL
            #print(dfx)
            pos = sum(dfx > correlThreshold)
            neg = sum(dfx < correlThreshold)

            if((!is.na(pos)) && pos == (total_numb_input_files)){
                res = rname
            }

            if((!is.na(neg)) && neg == (total_numb_input_files)){
                res = rname
            }
            res         
        }
    , s_df
  )
)
stopCluster(cl)





calc_consistency = function(rname, s_df){
            dfx = as.vector(unlist(s_df[rname, ,drop=T]))
            res = NULL
            #print(dfx)
            pos = sum(dfx > correlThreshold)
            neg = sum(dfx < correlThreshold)

            if((!is.na(pos)) && pos == (total_numb_input_files)){
                res = rname
            }
            if((!is.na(neg)) && neg == (total_numb_input_files)){
                 res = rname
            }
            return(res)
}

print("Lapply all conditions call function")
system.time(lapply(rownames(s_df), calc_consistency, s_df))

print("ParLapply all conditions call function")
library(doParallel)
cl<-makeCluster(4, type="FORK")
#registerDoParallel(cl)
system.time(parLapply(cl, rownames(s_df), calc_consistency, s_df))
stopCluster(cl)

print("For loop all conditions call function")
system.time(
for(rname in rownames(s_df)){
        rows_passing_consistency = append(rows_passing_consistency, calc_consistency(rname, s_df))
}
)

print("Parallel For loop all conditions call function")
library(doParallel)
cl<-makeCluster(4, type="FORK")
registerDoParallel(cl)
system.time(
foreach(rname=rownames(s_df), .combine = c) %dopar% {
        calc_consistency(rname, s_df)
}
)
stopCluster(cl)

所以事实证明,速度的主要差异是由于将 "row indexes" 与 "rownames" 传递给应用函数造成的。我尝试将 (l)apply 与内联和单独的函数调用一起使用,有和没有并行化。使用 apply 进行内联调用与函数调用没有太大区别。并行化也同样有效。主要的时间延迟是由于传递索引与行名造成的,尽管我不确定为什么会发生这种情况。