运行 在 R 中并行检测断点 (lm)
Run breakpoint (lm) detection in parallel in R
我在 R 中进行了大约 80000 个时间序列断点检测计算。我有所有这些极其不同的时间序列,我无法应用 ARIMA 模型,所以我计算每个时间序列的线性模型,然后提取断点并使用回归的拟合结果,以计算来自最后一个断点的趋势。
在上面的示例中,算法将检测到三个断点(一个倾斜、一个相当平坦和一个下降)。它非常适合我的需要,但是 运行 每周连续一次计算 80k 断点的开销太大了,因此我试图通过在 R 中使用并行处理来实现它。
在这个例子中(在下面的数据中找到 link)我按顺序计算断点,所有 88k 大约需要 24 小时。
df.subset <- read.csv("dfsubset.csv)"
start <- Sys.time()
All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
Sys.time() - start
在这段代码片段中,我 运行 检测了 10 个时间序列(在我的 mac 上),耗时 47 秒 。我猜想并行化应该将这个基准时间减少到大约 1/4 pf 的时间。
下面我列出了三种尝试并行化计算的方法,但我无法让嵌套应用程序在并行设置中工作。
使用并行包
clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(tidyr); library(magrittr)})
myfunction <- function(df.subset) {
All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}
clusterExport(clus, "myfunction")
do.call(bind_rows, parApply(clus, df.subset, 1,{function(r) {
myfunction(r[1]) }}))
使用 multidplyr 包:
library(multidplyr)
cluster <- create_cluster(4)
set_default_cluster(cluster)
four <- function(x) {
All.Breakpoints <- x %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}
cluster_assign_value(cluster, 'four', four)
save <- df.subset %>% partition(CONC_ID) %>% map(four(.))
使用并行包但其他分组
library(parallel)
cl <- detectCores()
group <- df.subset %>% group_by(CONC_ID) %>% group_indices
df.subset <- bind_cols(tibble(group), df.subset)
cluster <- create_cluster(cores = cl)
by_group <- df.subset %>%
partition(group, cluster = cluster)
by_group %>%
# Assign libraries
cluster_library("tidyr") %>%
cluster_library("dplyr") %>%
cluster_library("strucchange") %>%
cluster_library("purrr") %>%
# Assign values (use this to load functions or data to each core)
cluster_assign_value("df.subset", df.subset)
cluster_eval(by_group, search())[[1]] # results for first cluster shown
only
cluster_get(by_group, "df.subset")[[1]]
start <- proc.time() # Start clock
sp_500_processed_in_parallel <- by_group %>% # Use by_group party_df
group_by(CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
%>%
collect() %>% # Special collect() function to recombine partitions
as_tibble() # Convert to tibble
time_elapsed_parallel <- proc.time() - start # End clock
time_elapsed_parallel
Link 到文件:
http://www.filedropper.com/dfsubset
感谢您的想法和反馈!
大多数时候提出问题并描述问题会自行解决...我发现 mutate 在任何地方都不起作用(老实说,Whosebug)在 R 中并行。
因此我改为使用 do 并通过 multidplyr 分配负载,并在运行时减少了大约 50% 的计算时间从 1 核到 4 核,从 1 核到 8 核时占总时间的 25%。
下面的代码。
## parallel
cl <- detectCores()
cl
df.cluster <- df.subset
cluster <- create_cluster(cores = cl)
cluster
by_group <- df.cluster %>%
partition(CONC_ID, cluster = cluster)
by_group
by_group %>%
# Assign libraries
cluster_library("strucchange")
cluster_eval(by_group, search())[[1]] # results for first cluster shown only
start <- proc.time() # Start clock
cluster.processed <- by_group %>%
do(model = breakpoints(ACT_QTY_new ~ Index, data = .)) %>%
collect()
time_elapsed_parallel <- proc.time() - start # End clock
time_elapsed_parallel
rm(by_grou)
gc()
Predictions <- cluster.processed %>%
mutate(SegmentedForecast = map(model, fitted))
df.fitted.vector <- as.data.frame(rowwise(Predictions[,3])) .
我在 R 中进行了大约 80000 个时间序列断点检测计算。我有所有这些极其不同的时间序列,我无法应用 ARIMA 模型,所以我计算每个时间序列的线性模型,然后提取断点并使用回归的拟合结果,以计算来自最后一个断点的趋势。
在上面的示例中,算法将检测到三个断点(一个倾斜、一个相当平坦和一个下降)。它非常适合我的需要,但是 运行 每周连续一次计算 80k 断点的开销太大了,因此我试图通过在 R 中使用并行处理来实现它。
在这个例子中(在下面的数据中找到 link)我按顺序计算断点,所有 88k 大约需要 24 小时。
df.subset <- read.csv("dfsubset.csv)"
start <- Sys.time()
All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
Sys.time() - start
在这段代码片段中,我 运行 检测了 10 个时间序列(在我的 mac 上),耗时 47 秒 。我猜想并行化应该将这个基准时间减少到大约 1/4 pf 的时间。
下面我列出了三种尝试并行化计算的方法,但我无法让嵌套应用程序在并行设置中工作。
使用并行包
clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(tidyr); library(magrittr)})
myfunction <- function(df.subset) {
All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}
clusterExport(clus, "myfunction")
do.call(bind_rows, parApply(clus, df.subset, 1,{function(r) {
myfunction(r[1]) }}))
使用 multidplyr 包:
library(multidplyr)
cluster <- create_cluster(4)
set_default_cluster(cluster)
four <- function(x) {
All.Breakpoints <- x %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}
cluster_assign_value(cluster, 'four', four)
save <- df.subset %>% partition(CONC_ID) %>% map(four(.))
使用并行包但其他分组
library(parallel)
cl <- detectCores()
group <- df.subset %>% group_by(CONC_ID) %>% group_indices
df.subset <- bind_cols(tibble(group), df.subset)
cluster <- create_cluster(cores = cl)
by_group <- df.subset %>%
partition(group, cluster = cluster)
by_group %>%
# Assign libraries
cluster_library("tidyr") %>%
cluster_library("dplyr") %>%
cluster_library("strucchange") %>%
cluster_library("purrr") %>%
# Assign values (use this to load functions or data to each core)
cluster_assign_value("df.subset", df.subset)
cluster_eval(by_group, search())[[1]] # results for first cluster shown
only
cluster_get(by_group, "df.subset")[[1]]
start <- proc.time() # Start clock
sp_500_processed_in_parallel <- by_group %>% # Use by_group party_df
group_by(CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
%>%
collect() %>% # Special collect() function to recombine partitions
as_tibble() # Convert to tibble
time_elapsed_parallel <- proc.time() - start # End clock
time_elapsed_parallel
Link 到文件:
http://www.filedropper.com/dfsubset
感谢您的想法和反馈!
大多数时候提出问题并描述问题会自行解决...我发现 mutate 在任何地方都不起作用(老实说,Whosebug)在 R 中并行。
因此我改为使用 do 并通过 multidplyr 分配负载,并在运行时减少了大约 50% 的计算时间从 1 核到 4 核,从 1 核到 8 核时占总时间的 25%。
下面的代码。
## parallel
cl <- detectCores()
cl
df.cluster <- df.subset
cluster <- create_cluster(cores = cl)
cluster
by_group <- df.cluster %>%
partition(CONC_ID, cluster = cluster)
by_group
by_group %>%
# Assign libraries
cluster_library("strucchange")
cluster_eval(by_group, search())[[1]] # results for first cluster shown only
start <- proc.time() # Start clock
cluster.processed <- by_group %>%
do(model = breakpoints(ACT_QTY_new ~ Index, data = .)) %>%
collect()
time_elapsed_parallel <- proc.time() - start # End clock
time_elapsed_parallel
rm(by_grou)
gc()
Predictions <- cluster.processed %>%
mutate(SegmentedForecast = map(model, fitted))
df.fitted.vector <- as.data.frame(rowwise(Predictions[,3])) .