R中foreach循环的结果
results from foreach loop in R
我有一个函数需要在 2000 个数据帧上 运行。每次迭代都需要很长时间,即近 40 分钟,因此我在 R 中使用 'foreach' 包。
我通过以下方式生成了数据:
library(foreach)
library(doParallel)
library(data.table)
library(matrixStats)
# DATA
datalist <- list()
for (i in 1:2000){
set.seed(i)
x1 <- rnorm(600,0.05,0.3)
x2 <- rnorm(600,-2,0.25)
data_2 <- data.frame(x1,x2)
lin_pred <- 1+(0.2 * data_2[1]) + (1.2*data_2[2])
prob <- 1/(1+exp(-lin_pred))
index <- rep(1:100, each = 6)
data <- data.frame(index,prob)
data_split <- split(data,f=data$index)
create_y <- function(p){
y <- c()
y_1 <- rbinom(1,1,p[1,2])
y <- append(y,y_1)
for(i in 2:6){
pr <- p[i,2] + 0.05*(y[i-1]-p[i-1,2])
u <- rbinom(1,1,pr)
y <- append(y,u)
}
return(y)
}
res <- lapply(data_split,create_y)
y <- data.frame(x=unlist(res))
final_data <- data.frame(index,y,x1,x2)
datalist[[i]] <- final_data
}
现在我已经定义了我的 objective 函数,它将针对上面定义的列表中的每个数据集进行优化:
## Objective function:
dpd_tdependent <- function(x, fixed = c(rep(FALSE,5))){
params <- fixed
dpd <- function(p){
params[!fixed] <- p
alpha <- params[1]
beta_0 <- params[2]
beta_1 <- params[3]
beta_2 <- params[4]
rho <- params[5]
add_pi <- function(d){
k <- beta_0+(d[3]*beta_1)+(d[4]*beta_2)
k1 <- exp(k)/(1+exp(k))
p <- c()
p <- append(p,k1[1,1])
for(i in 2:6){
u <- k1[i,1] + rho*(d[i-1,2]-k1[i-1,1])
p <- append(p,u)
}
d <- cbind(d,p)
}
dat_split <- split(x , f = x$index)
result <- lapply(dat_split, add_pi)
result <- rbindlist(result)
result <- as.data.frame(result)
colnames(result) <- c('index','y','x1','x2','exp_prob')
result_split <- split(result, f = result$index)
## First expression
full_prob <- function(d){
k <- as.data.frame(expand.grid(c(0,1),c(0,1),c(0,1),c(0,1),c(0,1),c(0,1)))
k <- as.data.frame(t(k))
val <- c()
for(j in 1:ncol(k)){
d[2]<- k[j]
m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
for(i in 2:nrow(d)){
c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c
}
val <- append(val,m)
}
val <- val^(1+alpha)
return(sum(val))
}
first_exp <- lapply(result_split,full_prob)
first_exp <- as.vector(unlist(first_exp))
## Second expression:
compute_prob <- function(d){
m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
for(i in 2:nrow(d)){
c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c
}
return(m^alpha)
}
second_exp <- lapply(result_split,compute_prob)
second_exp <- as.vector(unlist(second_exp))
final_res <- first_exp - ((1+1/alpha)*(second_exp))
final_result <- sum(final_res)
}
}
最后,我使用 foreach 包来加速这个过程:
cl = makeCluster(6)
registerDoParallel(cl)
mse <- matrix(,nrow=2000,ncol=5)
foreach(i = 1:2000, .packages = c('data.table','matrixStats'), .export = c('mse','datalist'))%dopar%{
beta <- rbind(1,0.2,1.2,0.05)
val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
b_s <- as.vector(optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)$par)
conv <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho = 0.001),val)$convergence
k <- (b_s -beta)
k <- append(k,conv)
mse[i,] <- rbind(k)
if(colCounts(mse , value = 0, na.rm = TRUE)[5] == 500) break
}
stopCluster(cl)
现在,我的问题是在 运行 在 foreach 包中使用 optim 函数后,矩阵 mse 没有填充所需的值。我知道我可以使用 foreach 函数中的 .combine 特性来 return 一个矩阵,但是我将如何为这样的矩阵分配一个名称?我的意思是,如果我无法分配名称,我将如何检查最后一个条件(在 if 内)并中断?
我非常感谢这方面的任何帮助。
谢谢。
我认为您不应该尝试修改每个 worker 中的全局变量。请参阅我上面的评论和 link。如果 500 次迭代收敛 = 0,则不应在迭代过程中进行检查,因为该信息不可用于每次迭代。以下是return你想要的
的一个选项
cl = makeCluster(6)
registerDoParallel(cl)
mse = foreach(i = 1:2000, .packages = c('data.table','matrixStats')) %dopar%{
beta <- rbind(1,0.2,1.2,0.05)
val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
optim_sol <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)
b_s <- optim_sol$par
conv <- optim_sol$convergence
c(b_s-beta,conv,i)
}
mse <- matrix(unlist(m),nrow=2000, byrow=T)
stopCluster(cl)
我有一个函数需要在 2000 个数据帧上 运行。每次迭代都需要很长时间,即近 40 分钟,因此我在 R 中使用 'foreach' 包。 我通过以下方式生成了数据:
library(foreach)
library(doParallel)
library(data.table)
library(matrixStats)
# DATA
datalist <- list()
for (i in 1:2000){
set.seed(i)
x1 <- rnorm(600,0.05,0.3)
x2 <- rnorm(600,-2,0.25)
data_2 <- data.frame(x1,x2)
lin_pred <- 1+(0.2 * data_2[1]) + (1.2*data_2[2])
prob <- 1/(1+exp(-lin_pred))
index <- rep(1:100, each = 6)
data <- data.frame(index,prob)
data_split <- split(data,f=data$index)
create_y <- function(p){
y <- c()
y_1 <- rbinom(1,1,p[1,2])
y <- append(y,y_1)
for(i in 2:6){
pr <- p[i,2] + 0.05*(y[i-1]-p[i-1,2])
u <- rbinom(1,1,pr)
y <- append(y,u)
}
return(y)
}
res <- lapply(data_split,create_y)
y <- data.frame(x=unlist(res))
final_data <- data.frame(index,y,x1,x2)
datalist[[i]] <- final_data
}
现在我已经定义了我的 objective 函数,它将针对上面定义的列表中的每个数据集进行优化:
## Objective function:
dpd_tdependent <- function(x, fixed = c(rep(FALSE,5))){
params <- fixed
dpd <- function(p){
params[!fixed] <- p
alpha <- params[1]
beta_0 <- params[2]
beta_1 <- params[3]
beta_2 <- params[4]
rho <- params[5]
add_pi <- function(d){
k <- beta_0+(d[3]*beta_1)+(d[4]*beta_2)
k1 <- exp(k)/(1+exp(k))
p <- c()
p <- append(p,k1[1,1])
for(i in 2:6){
u <- k1[i,1] + rho*(d[i-1,2]-k1[i-1,1])
p <- append(p,u)
}
d <- cbind(d,p)
}
dat_split <- split(x , f = x$index)
result <- lapply(dat_split, add_pi)
result <- rbindlist(result)
result <- as.data.frame(result)
colnames(result) <- c('index','y','x1','x2','exp_prob')
result_split <- split(result, f = result$index)
## First expression
full_prob <- function(d){
k <- as.data.frame(expand.grid(c(0,1),c(0,1),c(0,1),c(0,1),c(0,1),c(0,1)))
k <- as.data.frame(t(k))
val <- c()
for(j in 1:ncol(k)){
d[2]<- k[j]
m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
for(i in 2:nrow(d)){
c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c
}
val <- append(val,m)
}
val <- val^(1+alpha)
return(sum(val))
}
first_exp <- lapply(result_split,full_prob)
first_exp <- as.vector(unlist(first_exp))
## Second expression:
compute_prob <- function(d){
m <- d[1,5]^d[1,2] * ((1-d[1,5])^(1-d[1,2]))
for(i in 2:nrow(d)){
c <- 1+ (rho*((d[i,2]-d[i,5])*(d[i-1,2]-d[i-1,5]))/(sqrt(abs(d[i,5]*d[i-1,5]*(1-d[i,5])*(1-d[i-1,5])))))
m <- m* d[i,5]^d[i,2] * (1-d[i,5])^(1-d[i,2]) *c
}
return(m^alpha)
}
second_exp <- lapply(result_split,compute_prob)
second_exp <- as.vector(unlist(second_exp))
final_res <- first_exp - ((1+1/alpha)*(second_exp))
final_result <- sum(final_res)
}
}
最后,我使用 foreach 包来加速这个过程:
cl = makeCluster(6)
registerDoParallel(cl)
mse <- matrix(,nrow=2000,ncol=5)
foreach(i = 1:2000, .packages = c('data.table','matrixStats'), .export = c('mse','datalist'))%dopar%{
beta <- rbind(1,0.2,1.2,0.05)
val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
b_s <- as.vector(optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)$par)
conv <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho = 0.001),val)$convergence
k <- (b_s -beta)
k <- append(k,conv)
mse[i,] <- rbind(k)
if(colCounts(mse , value = 0, na.rm = TRUE)[5] == 500) break
}
stopCluster(cl)
现在,我的问题是在 运行 在 foreach 包中使用 optim 函数后,矩阵 mse 没有填充所需的值。我知道我可以使用 foreach 函数中的 .combine 特性来 return 一个矩阵,但是我将如何为这样的矩阵分配一个名称?我的意思是,如果我无法分配名称,我将如何检查最后一个条件(在 if 内)并中断?
我非常感谢这方面的任何帮助。 谢谢。
我认为您不应该尝试修改每个 worker 中的全局变量。请参阅我上面的评论和 link。如果 500 次迭代收敛 = 0,则不应在迭代过程中进行检查,因为该信息不可用于每次迭代。以下是return你想要的
的一个选项cl = makeCluster(6)
registerDoParallel(cl)
mse = foreach(i = 1:2000, .packages = c('data.table','matrixStats')) %dopar%{
beta <- rbind(1,0.2,1.2,0.05)
val <- dpd_tdependent(datalist[[i]], c(0.7,FALSE,FALSE,FALSE,FALSE))
optim_sol <- optim(c(beta_0 =0.7, beta_1 =0.05 ,beta_2 = 0.9,rho=0.001),val)
b_s <- optim_sol$par
conv <- optim_sol$convergence
c(b_s-beta,conv,i)
}
mse <- matrix(unlist(m),nrow=2000, byrow=T)
stopCluster(cl)