有没有办法通过输入文件并行化 运行 R 脚本?

Is there a way to parallelize running R script by input files?

我正在读取一堆文件(超过七千个)作为数据框。所有文件都在同一个父文件夹中,具有有序且一致的子目录。这些文件目前按时间戳组织。我想读取其中的文件,然后将文件导出到不同的文件夹,其中每个文件都是一个播放器 ID。同一输入数据帧中的单个玩家可能有多个时间戳。有时玩家 ID 根本不在输入数据框中。我已经弄清楚了数据整理(这很简单),但是由于每个文件大约有 150 万行,因此单个文件大约需要 5 个小时。所以我不能简单地遍历所有七千个文件。我想通过输入文件并行化(尽管通过输出文件并行化可能更好?)。我将 运行 在具有足够 CPU 的 HPC 上执行此操作,并且我不需要在使用 HPC 之前指定我的 CPU 要求。我知道 doParallel 包存在,但教程介绍 vignette("gettingstartedParallel") 没有用,我不明白其他 doParallel 帖子。 (请不要只是在没有相关代码的情况下让我参考 doParallel 包。)我还担心代码崩溃,因为它试图多次写入同一个 csv。即使我设置 append = TRUE,也无法并行写入 CSV。这是我如何读取文件以及如何整理文件并将其写入新文件夹的代码。

# Example input data frames (in the real code I create a vector of Alltimes using list.files() )
times1 <- data.frame(
  ID = c('PL1', 'PL2', 'PL3', 'PL2','PL1'),
  times = c(42.6, 41.5, 42.9, 47.0, 44.3),
  speed = c(64, 66, 43, 39, 55) 
)

times2 <- data.frame(
  ID = c('PL3', 'PL3', 'PL3', 'PL1','PL1'),
  times   = c(62.1, 51.7, 65.9, 62.1, 55.3),
  speed = c(71, 73, 45, 64, 66) 
)

# Create vector of all parquets filepaths
Alltimes <- list.files(path = 'Input_Folder_Path)',
                       pattern = '*.snappy.parquet$',
                       recursive = TRUE,
                       full.names = TRUE)

# Iterate through timestamp input files (I want this part parallelized instead of a loop)
# for( i in 1:length(Alltimes)){
  
  # Read in the individual file
  # when the Alltimes vector is the file path I use read_parquet( Alltimes[i] ), but
  # times1 is a substitute for this example.
  df = times1 
  # df = times2
  # df = read_parquet( Alltimes[i] )
  
  # get vector of all player ids in this data frame
  all_ids_vec <- unique(x = df$ID)
  
  # write out individual csv for each player ID
  for(j in 1:length(all_ids_vec)){
    
    # Subset the df by that specific player ID
    one_player <- df %>% filter(ID == all_ids_vec[j])
    
    write.table(x = one_player, 
                file = "C:/Users/Juliet/Desktop/", all_ids_vec[j],".csv", 
                append = TRUE, 
                quote = FALSE, 
                sep = ",", 
                row.names = FALSE, 
                col.names = FALSE)
  }
# }

要将文件并行写入磁盘,以下代码适用于 Windows。请注意,包 doParallel 不适用于 Windows。

library(parallel)

dirname <- "C:/Users/611913/Desktop"
#dirname <- path.expand("~/tmp/so")

sp <- split(df, df$ID)
id_vec <- names(sp)

f <- function(X, filename, path){
  filename <- file.path(path, filename)
  write.table(x = X, 
              file = filename, 
              append = TRUE, 
              quote = FALSE, 
              sep = ",", 
              row.names = FALSE, 
              col.names = FALSE)  
}

# Windows
ncores <- detectCores()

cl <- makeCluster(ncores - 1L)
clusterExport(cl, "sp")
clusterExport(cl, "id_vec")
clusterExport(cl, "dirname")
clusterExport(cl, "f")

clusterEvalQ(cl, "sp")
clusterEvalQ(cl, "id_vec")
clusterEvalQ(cl, "dirname")
clusterEvalQ(cl, "f")

res <- parLapply(cl, seq_along(sp), function(j){
  f(sp[[j]], id_vec[j], dirname)
})

stopCluster(cl)

测试数据

此代码创建测试数据。

n <- 1000L
ID <- sprintf("id%04d", seq_len(n))
ID <- rep(ID, 3)
df <- data.frame(ID, x = rnorm(3*n), y = sample(10, 3*n, TRUE))