R:了解 "makePSOCKcluster" 的影响
R: Understanding the impact of "makePSOCKcluster"
我正在使用 R 编程语言。最近,我在 R makePSOCKcluster
中遇到了以下功能,可以用来“加速”R 执行某些任务的速度 (https://www.rdocumentation.org/packages/future/versions/1.19.1/topics/makeClusterPSOCK)。
根据我看到的一些帖子,makePSOCKcluster
似乎就像一个“包装器”,您可以在其中放置想要“加速”的代码,例如:
library(doParallel)
library(future)
cl <- makePSOCKcluster(6) # 6 cpu cores out of 8
registerDoParallel(cl)
### enter your code here
stopCluster(cl) # when finished`
我尝试调整此设置以帮助加速我正在使用的 R 程序:
cl <- makePSOCKcluster(6) # 6 cpu cores out of 8
registerDoParallel(cl)
### START
library(dplyr)
library(data.table)
results_table <- data.frame()
grid_function <- function(train_data, random_1, random_2, random_3, random_4, split_1, split_2, split_3) {
#bin data according to random criteria
train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))
train_data$cat = as.factor(train_data$cat)
#new splits
a_table = train_data %>%
filter(cat == "a") %>%
select(a1, b1, c1, cat)
b_table = train_data %>%
filter(cat == "b") %>%
select(a1, b1, c1, cat)
c_table = train_data %>%
filter(cat == "c") %>%
select(a1, b1, c1, cat)
#calculate random quantile ("quant") for each bin
table_a = data.frame(a_table%>% group_by(cat) %>%
mutate(quant = quantile(c1, prob = split_1)))
table_b = data.frame(b_table%>% group_by(cat) %>%
mutate(quant = quantile(c1, prob = split_2)))
table_c = data.frame(c_table%>% group_by(cat) %>%
mutate(quant = quantile(c1, prob = split_3)))
#create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)
#group all tables
final_table = rbind(table_a, table_b, table_c)
#create a table: for each bin, calculate the average of "diff"
final_table_2 = data.frame(final_table %>%
group_by(cat) %>%
summarize(
mean = mean(diff)
))
#add "total mean" to this table
final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))
#format this table: add the random criteria to this table for reference
final_table_2$random_1 = random_1
final_table_2$random_2 = random_2
final_table_2$random_3 = random_3
final_table_2$random_4 = random_4
final_table_2$split_1 = split_1
final_table_2$split_2 = split_2
final_table_2$split_3 = split_3
results_table <- rbind(results_table, final_table_2)
final_results = dcast(setDT(results_table), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
}
# create some data for this example
a1 = rnorm(1000,100,10)
b1 = rnorm(1000,100,5)
c1 = sample.int(1000, 1000, replace = TRUE)
train_data = data.frame(a1,b1,c1)
#grid
random_1 <- seq(80,100,5)
random_2 <- seq(85,120,5)
random_3 <- seq(85,120,5)
random_4 <- seq(90,120,5)
split_1 = seq(0,1,0.1)
split_2 = seq(0,1,0.1)
split_3 = seq(0,1,0.1)
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)
#reduce the size of the grid for this example
DF_1 = DF_1[1:100000,]
colnames(DF_1) <- c("random_1" , "random_2", "random_3", "random_4", "split_1", "split_2", "split_3")
train_data_new <- copy(train_data)
resultdf1 <- apply(DF_1,1, # 1 means rows
FUN=function(x){
do.call(
# Call Function grid_function2 with the arguments in
# a list
grid_function,
# force list type for the arguments
c(list(train_data_new), as.list(
# make the row to a named vector
unlist(x)
)
))
}
)
l = resultdf1
final_output = rbindlist(l, fill = TRUE)
### END
# when finished`
stopCluster(cl)
如果我要“在本地”运行 上述代码,运行 将花费很长时间。我 运行 上面的代码使用“makePSOCKcluster”包装器 - 代码仍然是 运行ning.
问题:我不确定“makePSOCKcluster”是否真的会有所作为 - 我也不确定我是否以正确的方式使用了“makePSOCKcluster”包装器.有人可以告诉我我在做什么是否正确?还有其他方法可以加速此代码吗?
谢谢
这是一个尝试。
1。重写函数
下面的函数速度更快。我相信如果 data.frames 被矩阵取代,它会变得更快。结果不是 identical
到问题中原始函数输出的结果,因为有一些 NA
现在为零。
library(foreach)
library(parallel)
library(dplyr)
library(data.table)
### START
grid_function_2 <- function(train_data, X) {
random_1234 <- X[1:4]
split_123 <- X[ seq_along(X)[-(1:4)] ]
#bin data according to random criteria
ia <- which(train_data$a1 <= random_1234[1] & train_data$b1 <= random_1234[3])
ib <- which(train_data$a1 <= random_1234[2] & train_data$b1 <= random_1234[4])
train_data$cat <- "c"
train_data$cat[ia] <- "a"
train_data$cat[ib] <- "b"
train_data$cat <- factor(train_data$cat)
#new splits
table_abc <- split(train_data, train_data$cat)
#calculate random quantile ("quant") for each bin
table_abc <- lapply(seq_along(table_abc), function(i){
quant <- quantile(table_abc[[i]]$c1, prob = split_123[i])
diff <- as.integer(quant > table_abc[[i]]$c1)
cbind(table_abc[[i]], diff = diff)
})
#group all tables
final_table <- do.call(rbind, table_abc)
final_table_2 <- final_table %>%
group_by(cat) %>%
summarise(mean = mean(diff))
#add "total mean" to this table
final_table_2 <- final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff))
#format this table: add the random criteria to this table for reference
final_table_2$random_1 <- random_1234[1]
final_table_2$random_2 <- random_1234[2]
final_table_2$random_3 <- random_1234[3]
final_table_2$random_4 <- random_1234[4]
final_table_2$split_1 <- split_123[1]
final_table_2$split_2 <- split_123[2]
final_table_2$split_3 <- split_123[3]
dcast(as.data.table(final_table_2), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
}
2。测试数据
通过设置 RNG 种子,测试数据集更小且可重现。
# make the results reproducible
set.seed(2021)
# create some data for this example
a1 <- rnorm(1000,100,10)
b1 <- rnorm(1000,100,5)
c1 <- sample.int(1000, 1000, replace = TRUE)
train_data <- data.frame(a1,b1,c1)
#grid
n <- 3
random_1 <- seq(80,100,5)[1:n]
random_2 <- seq(85,120,5)[1:n]
random_3 <- seq(85,120,5)[1:n]
random_4 <- seq(90,120,5)[1:n]
split_1 <- seq(0,1,0.1)[1:n]
split_2 <- seq(0,1,0.1)[1:n]
split_3 <- seq(0,1,0.1)[1:n]
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)
# smaller data set with 3^7 rows
dim(DF_1)
#[1] 2187 7
## NOT RUN
#reduce the size of the grid for this example
#DF_1 <- DF_1[1:100000,]
colnames(DF_1) <- c("random_1" , "random_2", "random_3", "random_4",
"split_1", "split_2", "split_3")
train_data_new <- copy(train_data)
3。计时
下面是对比测试
参考值为t0 <- system.time({original apply loop and function})
.
然后是三个测试,apply/new function
,mclapply
(不在Windows)和parLapply
.
t1 <- system.time({
resultdf1 <- apply(DF_1, 1, # 1 means rows
FUN = function(x){
grid_function_2(train_data_new, x)
}
)
})
final_output2 <- rbindlist(resultdf1, fill = TRUE)
# This cannot be run on Windows
t2 <- system.time({
ncores <- detectCores()
resultdf1 <- mclapply(1:nrow(DF_1), function(i){
grid_function_2(train_data_new, DF_1[i, ])
},
mc.cores = ncores - 1L)
})
final_output3 <- rbindlist(resultdf1, fill = TRUE)
t3 <- system.time({
ncores <- detectCores()
cl <- makePSOCKcluster(ncores - 1L) # reserve 1 cpu core
clusterExport(cl, "train_data_new")
clusterExport(cl, "DF_1")
clusterExport(cl, "grid_function_2")
clusterEvalQ(cl, "train_data_new")
clusterEvalQ(cl, "DF_1")
clusterEvalQ(cl, "grid_function_2")
clusterEvalQ(cl, library(dplyr))
clusterEvalQ(cl, library(data.table))
resultdf1 <- parLapply(
cl = cl,
X = 1:nrow(DF_1),
function(i){
grid_function_2(train_data_new, DF_1[i, ])
}
)
# when finished`
stopCluster(cl)
})
final_output4 <- rbindlist(resultdf1, fill = TRUE)
4。查看结果
就像我上面说的,将原始 final_output
与此 final_output2
进行比较时存在 NA
值不匹配。
all.equal(final_output, final_output2)
#[1] "Column 'b': 'is.NA' value mismatch: 0 in current 243 in target"
identical(final_output2, final_output3)
#[1] TRUE
identical(final_output2, final_output4)
#[1] TRUE
至于计时,如果 Windows 上的代码是 运行,请删除 t2
。最快的是 t2
和 t3
,大约快了一个数量级。
rbind(t0, t1, t2, t3)
我正在使用 R 编程语言。最近,我在 R makePSOCKcluster
中遇到了以下功能,可以用来“加速”R 执行某些任务的速度 (https://www.rdocumentation.org/packages/future/versions/1.19.1/topics/makeClusterPSOCK)。
根据我看到的一些帖子,makePSOCKcluster
似乎就像一个“包装器”,您可以在其中放置想要“加速”的代码,例如:
library(doParallel)
library(future)
cl <- makePSOCKcluster(6) # 6 cpu cores out of 8
registerDoParallel(cl)
### enter your code here
stopCluster(cl) # when finished`
我尝试调整此设置以帮助加速我正在使用的 R 程序:
cl <- makePSOCKcluster(6) # 6 cpu cores out of 8
registerDoParallel(cl)
### START
library(dplyr)
library(data.table)
results_table <- data.frame()
grid_function <- function(train_data, random_1, random_2, random_3, random_4, split_1, split_2, split_3) {
#bin data according to random criteria
train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))
train_data$cat = as.factor(train_data$cat)
#new splits
a_table = train_data %>%
filter(cat == "a") %>%
select(a1, b1, c1, cat)
b_table = train_data %>%
filter(cat == "b") %>%
select(a1, b1, c1, cat)
c_table = train_data %>%
filter(cat == "c") %>%
select(a1, b1, c1, cat)
#calculate random quantile ("quant") for each bin
table_a = data.frame(a_table%>% group_by(cat) %>%
mutate(quant = quantile(c1, prob = split_1)))
table_b = data.frame(b_table%>% group_by(cat) %>%
mutate(quant = quantile(c1, prob = split_2)))
table_c = data.frame(c_table%>% group_by(cat) %>%
mutate(quant = quantile(c1, prob = split_3)))
#create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)
#group all tables
final_table = rbind(table_a, table_b, table_c)
#create a table: for each bin, calculate the average of "diff"
final_table_2 = data.frame(final_table %>%
group_by(cat) %>%
summarize(
mean = mean(diff)
))
#add "total mean" to this table
final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))
#format this table: add the random criteria to this table for reference
final_table_2$random_1 = random_1
final_table_2$random_2 = random_2
final_table_2$random_3 = random_3
final_table_2$random_4 = random_4
final_table_2$split_1 = split_1
final_table_2$split_2 = split_2
final_table_2$split_3 = split_3
results_table <- rbind(results_table, final_table_2)
final_results = dcast(setDT(results_table), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
}
# create some data for this example
a1 = rnorm(1000,100,10)
b1 = rnorm(1000,100,5)
c1 = sample.int(1000, 1000, replace = TRUE)
train_data = data.frame(a1,b1,c1)
#grid
random_1 <- seq(80,100,5)
random_2 <- seq(85,120,5)
random_3 <- seq(85,120,5)
random_4 <- seq(90,120,5)
split_1 = seq(0,1,0.1)
split_2 = seq(0,1,0.1)
split_3 = seq(0,1,0.1)
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)
#reduce the size of the grid for this example
DF_1 = DF_1[1:100000,]
colnames(DF_1) <- c("random_1" , "random_2", "random_3", "random_4", "split_1", "split_2", "split_3")
train_data_new <- copy(train_data)
resultdf1 <- apply(DF_1,1, # 1 means rows
FUN=function(x){
do.call(
# Call Function grid_function2 with the arguments in
# a list
grid_function,
# force list type for the arguments
c(list(train_data_new), as.list(
# make the row to a named vector
unlist(x)
)
))
}
)
l = resultdf1
final_output = rbindlist(l, fill = TRUE)
### END
# when finished`
stopCluster(cl)
如果我要“在本地”运行 上述代码,运行 将花费很长时间。我 运行 上面的代码使用“makePSOCKcluster”包装器 - 代码仍然是 运行ning.
问题:我不确定“makePSOCKcluster”是否真的会有所作为 - 我也不确定我是否以正确的方式使用了“makePSOCKcluster”包装器.有人可以告诉我我在做什么是否正确?还有其他方法可以加速此代码吗?
谢谢
这是一个尝试。
1。重写函数
下面的函数速度更快。我相信如果 data.frames 被矩阵取代,它会变得更快。结果不是 identical
到问题中原始函数输出的结果,因为有一些 NA
现在为零。
library(foreach)
library(parallel)
library(dplyr)
library(data.table)
### START
grid_function_2 <- function(train_data, X) {
random_1234 <- X[1:4]
split_123 <- X[ seq_along(X)[-(1:4)] ]
#bin data according to random criteria
ia <- which(train_data$a1 <= random_1234[1] & train_data$b1 <= random_1234[3])
ib <- which(train_data$a1 <= random_1234[2] & train_data$b1 <= random_1234[4])
train_data$cat <- "c"
train_data$cat[ia] <- "a"
train_data$cat[ib] <- "b"
train_data$cat <- factor(train_data$cat)
#new splits
table_abc <- split(train_data, train_data$cat)
#calculate random quantile ("quant") for each bin
table_abc <- lapply(seq_along(table_abc), function(i){
quant <- quantile(table_abc[[i]]$c1, prob = split_123[i])
diff <- as.integer(quant > table_abc[[i]]$c1)
cbind(table_abc[[i]], diff = diff)
})
#group all tables
final_table <- do.call(rbind, table_abc)
final_table_2 <- final_table %>%
group_by(cat) %>%
summarise(mean = mean(diff))
#add "total mean" to this table
final_table_2 <- final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff))
#format this table: add the random criteria to this table for reference
final_table_2$random_1 <- random_1234[1]
final_table_2$random_2 <- random_1234[2]
final_table_2$random_3 <- random_1234[3]
final_table_2$random_4 <- random_1234[4]
final_table_2$split_1 <- split_123[1]
final_table_2$split_2 <- split_123[2]
final_table_2$split_3 <- split_123[3]
dcast(as.data.table(final_table_2), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
}
2。测试数据
通过设置 RNG 种子,测试数据集更小且可重现。
# make the results reproducible
set.seed(2021)
# create some data for this example
a1 <- rnorm(1000,100,10)
b1 <- rnorm(1000,100,5)
c1 <- sample.int(1000, 1000, replace = TRUE)
train_data <- data.frame(a1,b1,c1)
#grid
n <- 3
random_1 <- seq(80,100,5)[1:n]
random_2 <- seq(85,120,5)[1:n]
random_3 <- seq(85,120,5)[1:n]
random_4 <- seq(90,120,5)[1:n]
split_1 <- seq(0,1,0.1)[1:n]
split_2 <- seq(0,1,0.1)[1:n]
split_3 <- seq(0,1,0.1)[1:n]
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)
# smaller data set with 3^7 rows
dim(DF_1)
#[1] 2187 7
## NOT RUN
#reduce the size of the grid for this example
#DF_1 <- DF_1[1:100000,]
colnames(DF_1) <- c("random_1" , "random_2", "random_3", "random_4",
"split_1", "split_2", "split_3")
train_data_new <- copy(train_data)
3。计时
下面是对比测试
参考值为t0 <- system.time({original apply loop and function})
.
然后是三个测试,apply/new function
,mclapply
(不在Windows)和parLapply
.
t1 <- system.time({
resultdf1 <- apply(DF_1, 1, # 1 means rows
FUN = function(x){
grid_function_2(train_data_new, x)
}
)
})
final_output2 <- rbindlist(resultdf1, fill = TRUE)
# This cannot be run on Windows
t2 <- system.time({
ncores <- detectCores()
resultdf1 <- mclapply(1:nrow(DF_1), function(i){
grid_function_2(train_data_new, DF_1[i, ])
},
mc.cores = ncores - 1L)
})
final_output3 <- rbindlist(resultdf1, fill = TRUE)
t3 <- system.time({
ncores <- detectCores()
cl <- makePSOCKcluster(ncores - 1L) # reserve 1 cpu core
clusterExport(cl, "train_data_new")
clusterExport(cl, "DF_1")
clusterExport(cl, "grid_function_2")
clusterEvalQ(cl, "train_data_new")
clusterEvalQ(cl, "DF_1")
clusterEvalQ(cl, "grid_function_2")
clusterEvalQ(cl, library(dplyr))
clusterEvalQ(cl, library(data.table))
resultdf1 <- parLapply(
cl = cl,
X = 1:nrow(DF_1),
function(i){
grid_function_2(train_data_new, DF_1[i, ])
}
)
# when finished`
stopCluster(cl)
})
final_output4 <- rbindlist(resultdf1, fill = TRUE)
4。查看结果
就像我上面说的,将原始 final_output
与此 final_output2
进行比较时存在 NA
值不匹配。
all.equal(final_output, final_output2)
#[1] "Column 'b': 'is.NA' value mismatch: 0 in current 243 in target"
identical(final_output2, final_output3)
#[1] TRUE
identical(final_output2, final_output4)
#[1] TRUE
至于计时,如果 Windows 上的代码是 运行,请删除 t2
。最快的是 t2
和 t3
,大约快了一个数量级。
rbind(t0, t1, t2, t3)