使用需要并行写入数据帧的 parSapply
Using parSapply that needs to write to a data frame in parallel
我有一个自定义函数,它遍历时间序列数据框和 returns 从原始时间序列滑动 window 30 分钟。从这 30 分钟开始,该函数将开始和结束时间戳以及此滑动的最小值和最大值写入另一个数据帧 window。
之后使用 sapply 语句使该函数在整个数据范围内递归。
应用太慢但有效。我希望能够并行化 sapply,但是当我这样做时代码 returns 错误。我把这归因于将函数的最终结果并行写入同一个数据帧的要求。
result_df_2 <- data.frame(Start.time=as.POSIXct(character()), finish.time = as.POSIXct(character()), max.value = double(), min.value = double(), stringsAsFactors = FALSE)
sliding_window <- function(sequence, time_row, Query, window_width) {
sliding_window_1 <- Query[time_row <= (time_row[sequence] + window_width * 60 + 1 * 60) &
time_row > time_row[sequence], ]
if (nrow(sliding_window_1) >= 1) {
temp.df <- data.frame(Start.time = sliding_window_1$TIME[1],
finish.time = sliding_window_1$TIME[nrow(sliding_window_1)],
max.value = max(sliding_window_1$C19.X.AAA.01, na.rm = T),
min.value = min(sliding_window_1$C19.X.AAA.01, na.rm = T))
result_df_2[nrow(result_df_2)+1,] <<- temp.df[1,]
}
}
sapply(1:(nrow(WBP) - 30), FUN = sliding_window, Query = WBP, time_row = WBP$TIME, window_width = 30)
这个问题中的一个答案提到了这样的场景是可以并行化的。我需要你的帮助才能知道怎么做。
下面是dput(WBP[1:10,])
的输出
structure(list(TIME = structure(c(1484589600, 1484589660, 1484589720,
1484589780, 1484589840, 1484589900, 1484589960, 1484590020, 1484590080,
1484590140), class = c("POSIXct", "POSIXt"), tzone = ""), C19.X = c(216.193,
220.204, 218.845, 218.676, 219.194, 219.976, 219.894, 219.168,
216.713, 216.551), C19.N = c(214.201, 216.985, 218.15, 217.3,
218.11, 218.194, 218.332, 216.679, 215.343, 215.403), C19.X.AA.01 = c(216.193,
220.204, NA, NA, NA, NA, NA, NA, NA, 216.551), C19.X.AAA.01 = c(216.193,
220.204, 219.747375, 219.29075, 218.834125, 218.3775, 217.920875,
217.46425, 217.007625, 216.551)), .Names = c("TIME", "C19.X",
"C19.N", "C19.X.AA.01", "C19.X.AAA.01"), row.names = c(NA, 10L
), class = c("data.table", "data.frame"))
技巧 1:使用 "pure" 没有副作用的函数
在您的示例中,您初始化 return_df_2
并使用该函数对其进行更改。这并不是 sapply
之类的东西的真正用途(这也是它不能很好地并行化的原因之一)。相反,尝试使函数 return 成为您想要的结果,然后将所有答案放入 data.frame 中。例如,
f <- function(x) {
x / 10 # this returns a value instead of modifying something that already exists
}
result <- sapply(1:5, FUN = f)
data.frame(result)
## result
## 1 0.1
## 2 0.2
## 3 0.3
## 4 0.4
## 5 0.5
提示 2:并行化可能很困难
因为 child 进程并不总是可以访问 parent 进程中定义的内容。在这种情况下,你得到 'result_df_2' not found
因为 children 没有通过那个。您可以使用上面的策略跳过该错误(但如果您有更复杂的功能,您将来可能会 运行 遇到那个问题,所以这只是 FYI)。这是一个使用 parSapply
:
的简单示例
library(parallel)
cl <- makeCluster(2)
result <- parSapply(cl, 1:5, f)
stopCluster(cl)
data.frame(result)
## result
## 1 0.1
## 2 0.2
## 3 0.3
## 4 0.4
## 5 0.5
可能的解决方案(但没有真实数据很难判断)
而不是修改data.frame,而是使用函数return temp.df
然后使用dplyr::bind_rows
或类似的方法来转换returned 列表将数据帧合并为一个数据帧(或者您希望解决方案看起来如何)。
sliding_window <- function(sequence, time_row, Query, window_width) {
sliding_window_1 <- Query[time_row <= (time_row[sequence] + window_width * 60 + 1 * 60) &
time_row > time_row[sequence], ]
if (nrow(sliding_window_1) >= 1) {
temp.df <- data.frame(Start.time = sliding_window_1$TIME[1],
finish.time = sliding_window_1$TIME[nrow(sliding_window_1)],
max.value = max(sliding_window_1$C19.X.AAA.01, na.rm = T),
min.value = min(sliding_window_1$C19.X.AAA.01, na.rm = T))
}
else {
temp.df <- data.frame(Start.time=as.POSIXct(character()), finish.time = as.POSIXct(character()), max.value = double(), min.value = double(), stringsAsFactors = FALSE)
}
temp.df
}
好的,加油。祝你好运。
我有一个自定义函数,它遍历时间序列数据框和 returns 从原始时间序列滑动 window 30 分钟。从这 30 分钟开始,该函数将开始和结束时间戳以及此滑动的最小值和最大值写入另一个数据帧 window。
之后使用 sapply 语句使该函数在整个数据范围内递归。
应用太慢但有效。我希望能够并行化 sapply,但是当我这样做时代码 returns 错误。我把这归因于将函数的最终结果并行写入同一个数据帧的要求。
result_df_2 <- data.frame(Start.time=as.POSIXct(character()), finish.time = as.POSIXct(character()), max.value = double(), min.value = double(), stringsAsFactors = FALSE)
sliding_window <- function(sequence, time_row, Query, window_width) {
sliding_window_1 <- Query[time_row <= (time_row[sequence] + window_width * 60 + 1 * 60) &
time_row > time_row[sequence], ]
if (nrow(sliding_window_1) >= 1) {
temp.df <- data.frame(Start.time = sliding_window_1$TIME[1],
finish.time = sliding_window_1$TIME[nrow(sliding_window_1)],
max.value = max(sliding_window_1$C19.X.AAA.01, na.rm = T),
min.value = min(sliding_window_1$C19.X.AAA.01, na.rm = T))
result_df_2[nrow(result_df_2)+1,] <<- temp.df[1,]
}
}
sapply(1:(nrow(WBP) - 30), FUN = sliding_window, Query = WBP, time_row = WBP$TIME, window_width = 30)
这个问题
下面是dput(WBP[1:10,])
的输出structure(list(TIME = structure(c(1484589600, 1484589660, 1484589720,
1484589780, 1484589840, 1484589900, 1484589960, 1484590020, 1484590080,
1484590140), class = c("POSIXct", "POSIXt"), tzone = ""), C19.X = c(216.193,
220.204, 218.845, 218.676, 219.194, 219.976, 219.894, 219.168,
216.713, 216.551), C19.N = c(214.201, 216.985, 218.15, 217.3,
218.11, 218.194, 218.332, 216.679, 215.343, 215.403), C19.X.AA.01 = c(216.193,
220.204, NA, NA, NA, NA, NA, NA, NA, 216.551), C19.X.AAA.01 = c(216.193,
220.204, 219.747375, 219.29075, 218.834125, 218.3775, 217.920875,
217.46425, 217.007625, 216.551)), .Names = c("TIME", "C19.X",
"C19.N", "C19.X.AA.01", "C19.X.AAA.01"), row.names = c(NA, 10L
), class = c("data.table", "data.frame"))
技巧 1:使用 "pure" 没有副作用的函数
在您的示例中,您初始化 return_df_2
并使用该函数对其进行更改。这并不是 sapply
之类的东西的真正用途(这也是它不能很好地并行化的原因之一)。相反,尝试使函数 return 成为您想要的结果,然后将所有答案放入 data.frame 中。例如,
f <- function(x) {
x / 10 # this returns a value instead of modifying something that already exists
}
result <- sapply(1:5, FUN = f)
data.frame(result)
## result
## 1 0.1
## 2 0.2
## 3 0.3
## 4 0.4
## 5 0.5
提示 2:并行化可能很困难
因为 child 进程并不总是可以访问 parent 进程中定义的内容。在这种情况下,你得到 'result_df_2' not found
因为 children 没有通过那个。您可以使用上面的策略跳过该错误(但如果您有更复杂的功能,您将来可能会 运行 遇到那个问题,所以这只是 FYI)。这是一个使用 parSapply
:
library(parallel)
cl <- makeCluster(2)
result <- parSapply(cl, 1:5, f)
stopCluster(cl)
data.frame(result)
## result
## 1 0.1
## 2 0.2
## 3 0.3
## 4 0.4
## 5 0.5
可能的解决方案(但没有真实数据很难判断)
而不是修改data.frame,而是使用函数return temp.df
然后使用dplyr::bind_rows
或类似的方法来转换returned 列表将数据帧合并为一个数据帧(或者您希望解决方案看起来如何)。
sliding_window <- function(sequence, time_row, Query, window_width) {
sliding_window_1 <- Query[time_row <= (time_row[sequence] + window_width * 60 + 1 * 60) &
time_row > time_row[sequence], ]
if (nrow(sliding_window_1) >= 1) {
temp.df <- data.frame(Start.time = sliding_window_1$TIME[1],
finish.time = sliding_window_1$TIME[nrow(sliding_window_1)],
max.value = max(sliding_window_1$C19.X.AAA.01, na.rm = T),
min.value = min(sliding_window_1$C19.X.AAA.01, na.rm = T))
}
else {
temp.df <- data.frame(Start.time=as.POSIXct(character()), finish.time = as.POSIXct(character()), max.value = double(), min.value = double(), stringsAsFactors = FALSE)
}
temp.df
}
好的,加油。祝你好运。