R中foreach循环的结果

results from foreach loop in R

我有一个函数需要在 2000 个数据帧上 运行。每次迭代都需要很长时间,即近 40 分钟,因此我在 R 中使用 'foreach' 包。 我通过以下方式生成了数据:

library(foreach)
library(doParallel)
library(data.table)
library(matrixStats)
# DATA
datalist <- list()
for (i in 1:2000){
  set.seed(i)
  x1 <- rnorm(600,0.05,0.3)
  x2 <- rnorm(600,-2,0.25)
  data_2 <- data.frame(x1,x2)
  lin_pred <- 1+(0.2 * data_2[1]) + (1.2*data_2[2]) 
  prob <- 1/(1+exp(-lin_pred))
  index <- rep(1:100, each = 6)
  data <- data.frame(index,prob)
  data_split <- split(data,f=data$index)
  create_y <- function(p){
    
    y <- c()
    y_1 <- rbinom(1,1,p[1,2])
    y <- append(y,y_1)
    for(i in 2:6){
      pr <- p[i,2] + 0.05*(y[i-1]-p[i-1,2])
      u <- rbinom(1,1,pr)
      y <- append(y,u)
    }
    return(y)
  }
  
  res <- lapply(data_split,create_y)
  y <- data.frame(x=unlist(res))
  final_data <- data.frame(index,y,x1,x2)
  datalist[[i]] <- final_data
}

现在我已经定义了我的 objective 函数,它将针对上面定义的列表中的每个数据集进行优化:

## Objective function:
dpd_tdependent <- function(x, fixed = c(rep(FALSE,5))){
  params <- fixed
  dpd <- function(p){
    params[!fixed] <- p
    alpha <- params[1]
    beta_0 <- params[2]
    beta_1 <- params[3]
    beta_2 <- params[4]
    rho <- params[5]
    add_pi <- function(d){
      k <- beta_0+(d[3]*beta_1)+(d[4]*beta_2)
      k1 <- exp(k)/(1+exp(k))
      p <- c()
      p <- append(p,k1[1,1])
      for(i in 2:6){
        u <- k1[i,1] + rho*(d[i-1,2]-k1[i-1,1])
        p <- append(p,u)
      }
      d <- cbind(d,p)
    }
    dat_split <- split(x , f  = x$index)
    result <- lapply(dat_split, add_pi)
    
    result <- rbindlist(result)
    result <- as.data.frame(result)
    colnames(result) <- c('index','y','x1','x2','exp_prob')
    result_split <- split(result, f = result$index)
    ## First expression
    full_prob <- function(d){
      k <- as.data.frame(expand.grid(c(0,1),c(0,1),c(0,1),c(0,1),c(0,1),c(0,1)))
      k <- as.data.frame(t(k))
      val <- c()
      for(j in 1:ncol(k)){
        d[2]<- k[j]
        m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
        for(i in 2:nrow(d)){
          c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
          m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c 
        }
        val <- append(val,m)
      }
      val <- val^(1+alpha)
      return(sum(val))
    }
    first_exp <- lapply(result_split,full_prob)
    first_exp <- as.vector(unlist(first_exp))
    
    ## Second expression:
    compute_prob <- function(d){
      m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
      for(i in 2:nrow(d)){
        c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
        m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c 
      }
      
      return(m^alpha)
    }
    second_exp <- lapply(result_split,compute_prob)
    second_exp <- as.vector(unlist(second_exp))
    
    final_res <- first_exp - ((1+1/alpha)*(second_exp))
    final_result <- sum(final_res)
    
  }
}

最后,我使用 foreach 包来加速这个过程:

cl = makeCluster(6)
registerDoParallel(cl)




mse <- matrix(,nrow=2000,ncol=5)
foreach(i = 1:2000, .packages = c('data.table','matrixStats'), .export = c('mse','datalist'))%dopar%{
  beta <- rbind(1,0.2,1.2,0.05)
  val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
  b_s <- as.vector(optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)$par)
  conv <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho = 0.001),val)$convergence
  k <- (b_s -beta)
  k <- append(k,conv)
  mse[i,] <- rbind(k)
  if(colCounts(mse , value = 0, na.rm = TRUE)[5] == 500) break
}
stopCluster(cl)

现在,我的问题是在 运行 在 foreach 包中使用 optim 函数后,矩阵 mse 没有填充所需的值。我知道我可以使用 foreach 函数中的 .combine 特性来 return 一个矩阵,但是我将如何为这样的矩阵分配一个名称?我的意思是,如果我无法分配名称,我将如何检查最后一个条件(在 if 内)并中断?

我非常感谢这方面的任何帮助。 谢谢。

我认为您不应该尝试修改每个 worker 中的全局变量。请参阅我上面的评论和 link。如果 500 次迭代收敛 = 0,则不应在迭代过程中进行检查,因为该信息不可用于每次迭代。以下是return你想要的

的一个选项
cl = makeCluster(6)
registerDoParallel(cl)

mse = foreach(i = 1:2000, .packages = c('data.table','matrixStats')) %dopar%{
  beta <- rbind(1,0.2,1.2,0.05)
  val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
  optim_sol <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)
  b_s <- optim_sol$par
  conv <- optim_sol$convergence
  c(b_s-beta,conv,i)
}
mse <- matrix(unlist(m),nrow=2000, byrow=T)

stopCluster(cl)