有没有办法通过输入文件并行化 运行 R 脚本?
Is there a way to parallelize running R script by input files?
我正在读取一堆文件(超过七千个)作为数据框。所有文件都在同一个父文件夹中,具有有序且一致的子目录。这些文件目前按时间戳组织。我想读取其中的文件,然后将文件导出到不同的文件夹,其中每个文件都是一个播放器 ID。同一输入数据帧中的单个玩家可能有多个时间戳。有时玩家 ID 根本不在输入数据框中。我已经弄清楚了数据整理(这很简单),但是由于每个文件大约有 150 万行,因此单个文件大约需要 5 个小时。所以我不能简单地遍历所有七千个文件。我想通过输入文件并行化(尽管通过输出文件并行化可能更好?)。我将 运行 在具有足够 CPU 的 HPC 上执行此操作,并且我不需要在使用 HPC 之前指定我的 CPU 要求。我知道 doParallel 包存在,但教程介绍 vignette("gettingstartedParallel")
没有用,我不明白其他 doParallel 帖子。 (请不要只是在没有相关代码的情况下让我参考 doParallel 包。)我还担心代码崩溃,因为它试图多次写入同一个 csv。即使我设置 append = TRUE
,也无法并行写入 CSV。这是我如何读取文件以及如何整理文件并将其写入新文件夹的代码。
# Example input data frames (in the real code I create a vector of Alltimes using list.files() )
times1 <- data.frame(
ID = c('PL1', 'PL2', 'PL3', 'PL2','PL1'),
times = c(42.6, 41.5, 42.9, 47.0, 44.3),
speed = c(64, 66, 43, 39, 55)
)
times2 <- data.frame(
ID = c('PL3', 'PL3', 'PL3', 'PL1','PL1'),
times = c(62.1, 51.7, 65.9, 62.1, 55.3),
speed = c(71, 73, 45, 64, 66)
)
# Create vector of all parquets filepaths
Alltimes <- list.files(path = 'Input_Folder_Path)',
pattern = '*.snappy.parquet$',
recursive = TRUE,
full.names = TRUE)
# Iterate through timestamp input files (I want this part parallelized instead of a loop)
# for( i in 1:length(Alltimes)){
# Read in the individual file
# when the Alltimes vector is the file path I use read_parquet( Alltimes[i] ), but
# times1 is a substitute for this example.
df = times1
# df = times2
# df = read_parquet( Alltimes[i] )
# get vector of all player ids in this data frame
all_ids_vec <- unique(x = df$ID)
# write out individual csv for each player ID
for(j in 1:length(all_ids_vec)){
# Subset the df by that specific player ID
one_player <- df %>% filter(ID == all_ids_vec[j])
write.table(x = one_player,
file = "C:/Users/Juliet/Desktop/", all_ids_vec[j],".csv",
append = TRUE,
quote = FALSE,
sep = ",",
row.names = FALSE,
col.names = FALSE)
}
# }
要将文件并行写入磁盘,以下代码适用于 Windows。请注意,包 doParallel
不适用于 Windows。
library(parallel)
dirname <- "C:/Users/611913/Desktop"
#dirname <- path.expand("~/tmp/so")
sp <- split(df, df$ID)
id_vec <- names(sp)
f <- function(X, filename, path){
filename <- file.path(path, filename)
write.table(x = X,
file = filename,
append = TRUE,
quote = FALSE,
sep = ",",
row.names = FALSE,
col.names = FALSE)
}
# Windows
ncores <- detectCores()
cl <- makeCluster(ncores - 1L)
clusterExport(cl, "sp")
clusterExport(cl, "id_vec")
clusterExport(cl, "dirname")
clusterExport(cl, "f")
clusterEvalQ(cl, "sp")
clusterEvalQ(cl, "id_vec")
clusterEvalQ(cl, "dirname")
clusterEvalQ(cl, "f")
res <- parLapply(cl, seq_along(sp), function(j){
f(sp[[j]], id_vec[j], dirname)
})
stopCluster(cl)
测试数据
此代码创建测试数据。
n <- 1000L
ID <- sprintf("id%04d", seq_len(n))
ID <- rep(ID, 3)
df <- data.frame(ID, x = rnorm(3*n), y = sample(10, 3*n, TRUE))
我正在读取一堆文件(超过七千个)作为数据框。所有文件都在同一个父文件夹中,具有有序且一致的子目录。这些文件目前按时间戳组织。我想读取其中的文件,然后将文件导出到不同的文件夹,其中每个文件都是一个播放器 ID。同一输入数据帧中的单个玩家可能有多个时间戳。有时玩家 ID 根本不在输入数据框中。我已经弄清楚了数据整理(这很简单),但是由于每个文件大约有 150 万行,因此单个文件大约需要 5 个小时。所以我不能简单地遍历所有七千个文件。我想通过输入文件并行化(尽管通过输出文件并行化可能更好?)。我将 运行 在具有足够 CPU 的 HPC 上执行此操作,并且我不需要在使用 HPC 之前指定我的 CPU 要求。我知道 doParallel 包存在,但教程介绍 vignette("gettingstartedParallel")
没有用,我不明白其他 doParallel 帖子。 (请不要只是在没有相关代码的情况下让我参考 doParallel 包。)我还担心代码崩溃,因为它试图多次写入同一个 csv。即使我设置 append = TRUE
,也无法并行写入 CSV。这是我如何读取文件以及如何整理文件并将其写入新文件夹的代码。
# Example input data frames (in the real code I create a vector of Alltimes using list.files() )
times1 <- data.frame(
ID = c('PL1', 'PL2', 'PL3', 'PL2','PL1'),
times = c(42.6, 41.5, 42.9, 47.0, 44.3),
speed = c(64, 66, 43, 39, 55)
)
times2 <- data.frame(
ID = c('PL3', 'PL3', 'PL3', 'PL1','PL1'),
times = c(62.1, 51.7, 65.9, 62.1, 55.3),
speed = c(71, 73, 45, 64, 66)
)
# Create vector of all parquets filepaths
Alltimes <- list.files(path = 'Input_Folder_Path)',
pattern = '*.snappy.parquet$',
recursive = TRUE,
full.names = TRUE)
# Iterate through timestamp input files (I want this part parallelized instead of a loop)
# for( i in 1:length(Alltimes)){
# Read in the individual file
# when the Alltimes vector is the file path I use read_parquet( Alltimes[i] ), but
# times1 is a substitute for this example.
df = times1
# df = times2
# df = read_parquet( Alltimes[i] )
# get vector of all player ids in this data frame
all_ids_vec <- unique(x = df$ID)
# write out individual csv for each player ID
for(j in 1:length(all_ids_vec)){
# Subset the df by that specific player ID
one_player <- df %>% filter(ID == all_ids_vec[j])
write.table(x = one_player,
file = "C:/Users/Juliet/Desktop/", all_ids_vec[j],".csv",
append = TRUE,
quote = FALSE,
sep = ",",
row.names = FALSE,
col.names = FALSE)
}
# }
要将文件并行写入磁盘,以下代码适用于 Windows。请注意,包 doParallel
不适用于 Windows。
library(parallel)
dirname <- "C:/Users/611913/Desktop"
#dirname <- path.expand("~/tmp/so")
sp <- split(df, df$ID)
id_vec <- names(sp)
f <- function(X, filename, path){
filename <- file.path(path, filename)
write.table(x = X,
file = filename,
append = TRUE,
quote = FALSE,
sep = ",",
row.names = FALSE,
col.names = FALSE)
}
# Windows
ncores <- detectCores()
cl <- makeCluster(ncores - 1L)
clusterExport(cl, "sp")
clusterExport(cl, "id_vec")
clusterExport(cl, "dirname")
clusterExport(cl, "f")
clusterEvalQ(cl, "sp")
clusterEvalQ(cl, "id_vec")
clusterEvalQ(cl, "dirname")
clusterEvalQ(cl, "f")
res <- parLapply(cl, seq_along(sp), function(j){
f(sp[[j]], id_vec[j], dirname)
})
stopCluster(cl)
测试数据
此代码创建测试数据。
n <- 1000L
ID <- sprintf("id%04d", seq_len(n))
ID <- rep(ID, 3)
df <- data.frame(ID, x = rnorm(3*n), y = sample(10, 3*n, TRUE))