并行读取多个文件并提取数据
Read many files in parallel and extract data
我有 1000 个 json
文件。我想并行阅读它们。我有 4
CPU 个内核。
我有一个字符向量,其中包含以下所有文件的名称:-
cik_files <- list.files("./data/", pattern = ".json")
并使用此 vector
我加载文件并提取数据并将其添加到以下列表中:-
data <- list()
下面是提取数据的代码:-
for(i in 1:1000){
data1 <- fromJSON(paste0("./data/", cik_files[i]), flatten = TRUE)
if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
data1 <- data1[grep("CY20[0-9]{2}$", data1$frame), c(3, 9)]
try({if(nrow(data1) > 0){
data1$cik <- strtrim(cik_files[i], 13)
data[[length(data) + 1]] <- data1
}}, silent = TRUE)
}
}
然而,这会花费很多时间。所以我想知道如何 运行 for 循环中的代码,但并行。
提前致谢。
这里尝试解决问题中的问题。未经测试,因为没有数据。
第 1 步
首先把题中的循环重写成一个函数
f <- function(i, path = "./data", cik_files){
filename <- file.path(path, cik_files[i])
data1 <- fromJSON(filename, flatten = TRUE)
if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
found <- grep("CY20[0-9]{2}$", data1$frame)
if(length(found) > 0){
tryCatch({
out <- data1[found, c(3, 9)]
out$cik <- strtrim(cik_files[i], 13)
out
},
error = function(e) e,
warning = function(w) w)
} else NULL
} else NULL
}
第 2 步
现在加载包 parallel
和 运行 以下之一,取决于 OS。
library(parallel)
# Not on Windows
library(jsonlite)
json_list <- mclapply(seq_along(cik_files), f, cik_files = cik_files)
# Windows
ncores <- detectCores()
cl <- makeCluster(ncores - 1L)
clusterExport(cl, "cik_files")
clusterEvalQ(cl, "cik_files")
clusterEvalQ(cl, library(jsonlite))
json_list <- parLapply(cl, seq_along(cik_files), f, cik_files = cik_files)
stopCluster(cl)
第 3 步
从返回的列表中提取数据json_list
。
err <- sapply(json_list, inherits, "error")
warn <- sapply(json_list, inherits, "warning")
ok <- !(err | warn)
json_list[ok] # correctly read in
我有 1000 个 json
文件。我想并行阅读它们。我有 4
CPU 个内核。
我有一个字符向量,其中包含以下所有文件的名称:-
cik_files <- list.files("./data/", pattern = ".json")
并使用此 vector
我加载文件并提取数据并将其添加到以下列表中:-
data <- list()
下面是提取数据的代码:-
for(i in 1:1000){
data1 <- fromJSON(paste0("./data/", cik_files[i]), flatten = TRUE)
if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
data1 <- data1[grep("CY20[0-9]{2}$", data1$frame), c(3, 9)]
try({if(nrow(data1) > 0){
data1$cik <- strtrim(cik_files[i], 13)
data[[length(data) + 1]] <- data1
}}, silent = TRUE)
}
}
然而,这会花费很多时间。所以我想知道如何 运行 for 循环中的代码,但并行。
提前致谢。
这里尝试解决问题中的问题。未经测试,因为没有数据。
第 1 步
首先把题中的循环重写成一个函数
f <- function(i, path = "./data", cik_files){
filename <- file.path(path, cik_files[i])
data1 <- fromJSON(filename, flatten = TRUE)
if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
found <- grep("CY20[0-9]{2}$", data1$frame)
if(length(found) > 0){
tryCatch({
out <- data1[found, c(3, 9)]
out$cik <- strtrim(cik_files[i], 13)
out
},
error = function(e) e,
warning = function(w) w)
} else NULL
} else NULL
}
第 2 步
现在加载包 parallel
和 运行 以下之一,取决于 OS。
library(parallel)
# Not on Windows
library(jsonlite)
json_list <- mclapply(seq_along(cik_files), f, cik_files = cik_files)
# Windows
ncores <- detectCores()
cl <- makeCluster(ncores - 1L)
clusterExport(cl, "cik_files")
clusterEvalQ(cl, "cik_files")
clusterEvalQ(cl, library(jsonlite))
json_list <- parLapply(cl, seq_along(cik_files), f, cik_files = cik_files)
stopCluster(cl)
第 3 步
从返回的列表中提取数据json_list
。
err <- sapply(json_list, inherits, "error")
warn <- sapply(json_list, inherits, "warning")
ok <- !(err | warn)
json_list[ok] # correctly read in