有没有办法在 xts 中将 period.apply 与 doParallel 和 foreach 一起使用?
Is there a way to use period.apply with doParallel and foreach in xts?
我想在 R 中并行化一个 period.apply 函数,我正在尝试将 doParallel
与 Foreach
一起使用,但我不知道如何实现它功能。我正在使用的数据是一个带有日期时间索引和变量值的 xts
对象,我想做的是每 5 秒计算一次数据的平均值:
VAR
2018-01-01 00:00:00 1945.054
2018-01-01 00:00:02 1944.940
2018-01-01 00:00:05 1945.061
2018-01-01 00:00:07 1945.255
2018-01-01 00:00:10 1945.007
2018-01-01 00:00:12 1944.995
这是我编写的代码示例,但它不起作用:
library(xts)
library(doParallel)
library(foreach)
cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)
ends <- endpoints(x,"secs",5)
m <- foreach(i = 1:length(index(x))) %dopar% period.apply(x,ends,mean)
index(m) <- foreach(m) %dopar% trunc(index(m),"secs")
stopCluster()
有效的代码是这样的,但对于更大的数据库来说,它需要太多时间:
ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")
有没有更有效的方法?
提前致谢。
您是否在一些简单的数据集上尝试过您的代码?因为一旦我达到 运行,它就会多次完成所有工作(x
中的每一行一次)。此外,如果您尝试并行化工作,通常最好让 'worker' 在发送回数据之前尽可能多地完成工作。在您的代码中,您有两个连续的 foreach
调用,这会导致额外的通信开销。
我的做法是这样的:
- 将
xts
对象拆分为 N
垃圾,确保我们以 5 秒的间隔之一拆分。
- 让每个 worker 完成一个块的所有工作。
- 合并结果。如何选择
N
?
由于split.xts
用于第一步,每个块将具有相同数量的5秒间隔。但是,要完成的工作量(可能)更多地取决于数据点的数量,而不是 5 秒间隔的数量。因此,如果这些块之间的点分布不均匀,则使用更多的块和一些负载平衡可能是有意义的。如果点的分布是均匀的,使 N
尽可能大以最小化通信开销是有意义的。这里我采用后一种方法,即设置 N
等于核心数。
现在让我们生成一些示例数据并应用您的工作解决方案:
library(xts)
x <- xts(x = runif(100),
order.by = as.POSIXct("2018-01-01") + 0:99)
ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")
接下来我们设置并行集群:
library(doParallel)
library(foreach)
cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)
现在我们必须拆分 xts
对象。这里我首先确定了整个对象的时间跨度,并以N
5s的间隔分布。
N <- cores
k <- as.integer(ceiling(difftime(max(index(x)), min(index(x)), units = "secs") / (5 * N)))
接下来,我将 xts
对象拆分为 xts
个对象的列表,每个对象的长度大致相同:
split_x <- split(x, f = "secs", k = 5 * k)
现在我让 foreach
遍历这些块并合并结果:
m2 <- foreach(x = split_x, .packages = c("xts"), .combine = c) %dopar% {
ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")
m
}
stopCluster(cluster)
万岁,结果相等:
all.equal(m, m2)
#> [1] TRUE
period.apply()
这个问题的表现让我很郁闷。我的抑郁症变成了让它更快的痴迷。所以我用 C 重写了它。这是一个使用它并显示性能改进的示例。
library(xts) # need the GitHub development version
period_apply <- xts:::period_apply # not exported
set.seed(21)
x <- .xts(rnorm(1e7), 1:1e7)
e <- endpoints(x, "seconds", 5)
system.time(y <- period.apply(x, e, sum)) # current version
# user system elapsed
# 77.904 0.368 78.462
system.time(z <- period_apply(x, e, sum)) # new C version
# user system elapsed
# 15.468 0.232 15.741
all.equal(y, z)
# [1] TRUE
所以对于这个例子来说它快了大约 5 倍。还有一些事情可以让它变得更快,但 5x 是一个停下来展示它可以更好的好地方。如果您想(并且足够勇敢)尝试一下,请查看 latest development version。
我想在 R 中并行化一个 period.apply 函数,我正在尝试将 doParallel
与 Foreach
一起使用,但我不知道如何实现它功能。我正在使用的数据是一个带有日期时间索引和变量值的 xts
对象,我想做的是每 5 秒计算一次数据的平均值:
VAR
2018-01-01 00:00:00 1945.054
2018-01-01 00:00:02 1944.940
2018-01-01 00:00:05 1945.061
2018-01-01 00:00:07 1945.255
2018-01-01 00:00:10 1945.007
2018-01-01 00:00:12 1944.995
这是我编写的代码示例,但它不起作用:
library(xts)
library(doParallel)
library(foreach)
cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)
ends <- endpoints(x,"secs",5)
m <- foreach(i = 1:length(index(x))) %dopar% period.apply(x,ends,mean)
index(m) <- foreach(m) %dopar% trunc(index(m),"secs")
stopCluster()
有效的代码是这样的,但对于更大的数据库来说,它需要太多时间:
ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")
有没有更有效的方法?
提前致谢。
您是否在一些简单的数据集上尝试过您的代码?因为一旦我达到 运行,它就会多次完成所有工作(x
中的每一行一次)。此外,如果您尝试并行化工作,通常最好让 'worker' 在发送回数据之前尽可能多地完成工作。在您的代码中,您有两个连续的 foreach
调用,这会导致额外的通信开销。
我的做法是这样的:
- 将
xts
对象拆分为N
垃圾,确保我们以 5 秒的间隔之一拆分。 - 让每个 worker 完成一个块的所有工作。
- 合并结果。如何选择
N
?
由于split.xts
用于第一步,每个块将具有相同数量的5秒间隔。但是,要完成的工作量(可能)更多地取决于数据点的数量,而不是 5 秒间隔的数量。因此,如果这些块之间的点分布不均匀,则使用更多的块和一些负载平衡可能是有意义的。如果点的分布是均匀的,使 N
尽可能大以最小化通信开销是有意义的。这里我采用后一种方法,即设置 N
等于核心数。
现在让我们生成一些示例数据并应用您的工作解决方案:
library(xts)
x <- xts(x = runif(100),
order.by = as.POSIXct("2018-01-01") + 0:99)
ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")
接下来我们设置并行集群:
library(doParallel)
library(foreach)
cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)
现在我们必须拆分 xts
对象。这里我首先确定了整个对象的时间跨度,并以N
5s的间隔分布。
N <- cores
k <- as.integer(ceiling(difftime(max(index(x)), min(index(x)), units = "secs") / (5 * N)))
接下来,我将 xts
对象拆分为 xts
个对象的列表,每个对象的长度大致相同:
split_x <- split(x, f = "secs", k = 5 * k)
现在我让 foreach
遍历这些块并合并结果:
m2 <- foreach(x = split_x, .packages = c("xts"), .combine = c) %dopar% {
ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")
m
}
stopCluster(cluster)
万岁,结果相等:
all.equal(m, m2)
#> [1] TRUE
period.apply()
这个问题的表现让我很郁闷。我的抑郁症变成了让它更快的痴迷。所以我用 C 重写了它。这是一个使用它并显示性能改进的示例。
library(xts) # need the GitHub development version
period_apply <- xts:::period_apply # not exported
set.seed(21)
x <- .xts(rnorm(1e7), 1:1e7)
e <- endpoints(x, "seconds", 5)
system.time(y <- period.apply(x, e, sum)) # current version
# user system elapsed
# 77.904 0.368 78.462
system.time(z <- period_apply(x, e, sum)) # new C version
# user system elapsed
# 15.468 0.232 15.741
all.equal(y, z)
# [1] TRUE
所以对于这个例子来说它快了大约 5 倍。还有一些事情可以让它变得更快,但 5x 是一个停下来展示它可以更好的好地方。如果您想(并且足够勇敢)尝试一下,请查看 latest development version。