并行读取多个文件并提取数据

Read many files in parallel and extract data

我有 1000 个 json 文件。我想并行阅读它们。我有 4 CPU 个内核。

我有一个字符向量,其中包含以下所有文件的名称:-

cik_files <- list.files("./data/", pattern = ".json")

并使用此 vector 我加载文件并提取数据并将其添加到以下列表中:-

data <- list()

下面是提取数据的代码:-

for(i in 1:1000){
  data1 <- fromJSON(paste0("./data/", cik_files[i]), flatten = TRUE)
  if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
    data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
    data1 <- data1[grep("CY20[0-9]{2}$", data1$frame), c(3, 9)]
    try({if(nrow(data1) > 0){
      data1$cik <- strtrim(cik_files[i], 13)
      data[[length(data) + 1]] <- data1
    }}, silent = TRUE)
  }
}

然而,这会花费很多时间。所以我想知道如何 运行 for 循环中的代码,但并行。

提前致谢。

这里尝试解决问题中的问题。未经测试,因为没有数据。

第 1 步

首先把题中的循环重写成一个函数

f <- function(i, path = "./data", cik_files){
  filename <- file.path(path, cik_files[i])
  data1 <- fromJSON(filename, flatten = TRUE)
  if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
    data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
    found <- grep("CY20[0-9]{2}$", data1$frame)
    if(length(found) > 0){
      tryCatch({
        out <- data1[found, c(3, 9)]
        out$cik <- strtrim(cik_files[i], 13)
        out
      },
      error = function(e) e,
      warning = function(w) w)
    } else NULL
  } else NULL
}

第 2 步

现在加载包 parallel 和 运行 以下之一,取决于 OS。

library(parallel)

# Not on Windows
library(jsonlite)
json_list <- mclapply(seq_along(cik_files), f, cik_files = cik_files)

# Windows
ncores <- detectCores()
cl <- makeCluster(ncores - 1L)
clusterExport(cl, "cik_files")
clusterEvalQ(cl, "cik_files")
clusterEvalQ(cl, library(jsonlite))

json_list <- parLapply(cl, seq_along(cik_files), f, cik_files = cik_files)

stopCluster(cl)

第 3 步

从返回的列表中提取数据json_list

err <- sapply(json_list, inherits, "error")
warn <- sapply(json_list, inherits, "warning")
ok <- !(err | warn)
json_list[ok]  # correctly read in