R:了解 "makePSOCKcluster" 的影响

R: Understanding the impact of "makePSOCKcluster"

我正在使用 R 编程语言。最近,我在 R makePSOCKcluster 中遇到了以下功能,可以用来“加速”R 执行某些任务的速度 (https://www.rdocumentation.org/packages/future/versions/1.19.1/topics/makeClusterPSOCK)。

根据我看到的一些帖子,makePSOCKcluster 似乎就像一个“包装器”,您可以在其中放置想要“加速”的代码,例如:

library(doParallel)
library(future)

cl <- makePSOCKcluster(6) # 6 cpu cores out of 8

registerDoParallel(cl)

### enter your code here

stopCluster(cl) # when finished`

我尝试调整此设置以帮助加速我正在使用的 R 程序:

cl <- makePSOCKcluster(6) # 6 cpu cores out of 8

registerDoParallel(cl)

### START

library(dplyr)
library(data.table)

results_table <- data.frame()

grid_function <- function(train_data, random_1, random_2, random_3, random_4, split_1, split_2, split_3) {
    
    
    
    #bin data according to random criteria
    train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))
    
    train_data$cat = as.factor(train_data$cat)
    
    #new splits
    a_table = train_data %>%
        filter(cat == "a") %>%
        select(a1, b1, c1, cat)
    
    b_table = train_data %>%
        filter(cat == "b") %>%
        select(a1, b1, c1, cat)
    
    c_table = train_data %>%
        filter(cat == "c") %>%
        select(a1, b1, c1, cat)
    
    
    #calculate random quantile ("quant") for each bin
    
    table_a = data.frame(a_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_1)))
    
    table_b = data.frame(b_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_2)))
    
    table_c = data.frame(c_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_3)))
    
    
    
    
    #create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
    table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
    table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
    table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)
    
    #group all tables
    
    final_table = rbind(table_a, table_b, table_c)
    
    #create a table: for each bin, calculate the average of "diff"
    final_table_2 = data.frame(final_table %>%
                                   group_by(cat) %>%
                                   summarize(
                                       mean = mean(diff)
                                   ))
    
    #add "total mean" to this table
    final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))
    
    #format this table: add the random criteria to this table for reference
    final_table_2$random_1 = random_1
    
    final_table_2$random_2 = random_2
    
    final_table_2$random_3 = random_3
    
    final_table_2$random_4 = random_4
    
    final_table_2$split_1 = split_1
    
    final_table_2$split_2 = split_2
    
    final_table_2$split_3 = split_3
    
    
    
    
    results_table <- rbind(results_table, final_table_2)
    
    final_results = dcast(setDT(results_table), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
    
}

# create some data for this example
a1 = rnorm(1000,100,10)
b1 = rnorm(1000,100,5)
c1 = sample.int(1000, 1000, replace = TRUE)
train_data = data.frame(a1,b1,c1)




#grid
random_1 <- seq(80,100,5)
random_2 <- seq(85,120,5)
random_3 <- seq(85,120,5)
random_4 <- seq(90,120,5)
split_1 =  seq(0,1,0.1)
split_2 =  seq(0,1,0.1)
split_3 =  seq(0,1,0.1)
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)

#reduce the size of the grid for this example
DF_1 = DF_1[1:100000,]

colnames(DF_1) <- c("random_1" , "random_2", "random_3",                     "random_4", "split_1", "split_2", "split_3")

train_data_new <- copy(train_data)


resultdf1 <- apply(DF_1,1, # 1 means rows
                   FUN=function(x){
                       do.call(
                           # Call Function grid_function2 with the arguments in
                           # a list
                           grid_function,
                           # force list type for the arguments
                           c(list(train_data_new), as.list(
                               # make the row to a named vector
                               unlist(x)
                           )
                           ))
                   }
)

l = resultdf1
final_output = rbindlist(l, fill = TRUE)

### END

# when finished`
stopCluster(cl) 

如果我要“在本地”运行 上述代码,运行 将花费很长时间。我 运行 上面的代码使用“makePSOCKcluster”包装器 - 代码仍然是 运行ning.

问题:我不确定“makePSOCKcluster”是否真的会有所作为 - 我也不确定我是否以正确的方式使用了“makePSOCKcluster”包装器.有人可以告诉我我在做什么是否正确?还有其他方法可以加速此代码吗?

谢谢

这是一个尝试。

1。重写函数

下面的函数速度更快。我相信如果 data.frames 被矩阵取代,它会变得更快。结果不是 identical 到问题中原始函数输出的结果,因为有一些 NA 现在为零。

library(foreach)
library(parallel)
library(dplyr)
library(data.table)

### START
grid_function_2 <- function(train_data, X) {
  random_1234 <- X[1:4]
  split_123 <- X[ seq_along(X)[-(1:4)] ]
  #bin data according to random criteria
  ia <- which(train_data$a1 <= random_1234[1] & train_data$b1 <= random_1234[3])
  ib <- which(train_data$a1 <= random_1234[2] & train_data$b1 <= random_1234[4])
  train_data$cat <- "c"
  train_data$cat[ia] <- "a"
  train_data$cat[ib] <- "b"
  train_data$cat <- factor(train_data$cat)
  #new splits
  table_abc <- split(train_data, train_data$cat)

  #calculate random quantile ("quant") for each bin

  table_abc <- lapply(seq_along(table_abc), function(i){
    quant <- quantile(table_abc[[i]]$c1, prob = split_123[i])
    diff <- as.integer(quant > table_abc[[i]]$c1)
    cbind(table_abc[[i]], diff = diff)
  })

  #group all tables
  final_table <- do.call(rbind, table_abc)
  final_table_2 <- final_table %>%
    group_by(cat) %>%
    summarise(mean = mean(diff))

  #add "total mean" to this table
  final_table_2 <- final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff))

  #format this table: add the random criteria to this table for reference
  final_table_2$random_1 <- random_1234[1]
  final_table_2$random_2 <- random_1234[2]
  final_table_2$random_3 <- random_1234[3]
  final_table_2$random_4 <- random_1234[4]
  final_table_2$split_1 <- split_123[1]
  final_table_2$split_2 <- split_123[2]
  final_table_2$split_3 <- split_123[3]

  dcast(as.data.table(final_table_2), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
}

2。测试数据

通过设置 RNG 种子,测试数据集更小且可重现。

# make the results reproducible
set.seed(2021)

# create some data for this example
a1 <- rnorm(1000,100,10)
b1 <- rnorm(1000,100,5)
c1 <- sample.int(1000, 1000, replace = TRUE)
train_data <- data.frame(a1,b1,c1)

#grid
n <- 3
random_1 <- seq(80,100,5)[1:n]
random_2 <- seq(85,120,5)[1:n]
random_3 <- seq(85,120,5)[1:n]
random_4 <- seq(90,120,5)[1:n]
split_1 <- seq(0,1,0.1)[1:n]
split_2 <- seq(0,1,0.1)[1:n]
split_3 <- seq(0,1,0.1)[1:n]
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)

# smaller data set with 3^7 rows
dim(DF_1)
#[1] 2187    7

## NOT RUN
#reduce the size of the grid for this example
#DF_1 <- DF_1[1:100000,]

colnames(DF_1) <- c("random_1" , "random_2", "random_3", "random_4",
                    "split_1", "split_2", "split_3")

train_data_new <- copy(train_data)

3。计时

下面是对比测试
参考值为t0 <- system.time({original apply loop and function}).

然后是三个测试,apply/new functionmclapply (不在Windows)parLapply .

t1 <- system.time({
  resultdf1 <- apply(DF_1, 1, # 1 means rows
                     FUN = function(x){
                       grid_function_2(train_data_new, x)
                     }
  )
})
final_output2 <- rbindlist(resultdf1, fill = TRUE)

# This cannot be run on Windows    
t2 <- system.time({
  ncores <- detectCores()
  resultdf1 <- mclapply(1:nrow(DF_1), function(i){
    grid_function_2(train_data_new, DF_1[i, ])
  },
  mc.cores = ncores - 1L)
})
final_output3 <- rbindlist(resultdf1, fill = TRUE)

t3 <- system.time({
  ncores <- detectCores()
  cl <- makePSOCKcluster(ncores - 1L) # reserve 1 cpu core
  clusterExport(cl, "train_data_new")
  clusterExport(cl, "DF_1")
  clusterExport(cl, "grid_function_2")
  clusterEvalQ(cl, "train_data_new")
  clusterEvalQ(cl, "DF_1")
  clusterEvalQ(cl, "grid_function_2")
  clusterEvalQ(cl, library(dplyr))
  clusterEvalQ(cl, library(data.table))
  resultdf1 <- parLapply(
    cl = cl,
    X = 1:nrow(DF_1),
    function(i){
      grid_function_2(train_data_new, DF_1[i, ])
    }
  )
  # when finished`
  stopCluster(cl)
})
final_output4 <- rbindlist(resultdf1, fill = TRUE)

4。查看结果

就像我上面说的,将原始 final_output 与此 final_output2 进行比较时存在 NA 值不匹配。

all.equal(final_output, final_output2)
#[1] "Column 'b': 'is.NA' value mismatch: 0 in current 243 in target"

identical(final_output2, final_output3)
#[1] TRUE
identical(final_output2, final_output4)
#[1] TRUE

至于计时,如果 Windows 上的代码是 运行,请删除 t2。最快的是 t2t3,大约快了一个数量级。

rbind(t0, t1, t2, t3)