R foreach:并行读取和操作多个文件
R foreach: Read and manipulate multiple files in parallel
我有 500 个 tar.xz 文件,其中包含 2000 个 csv 文件。我需要一次取消 tar 几个 tar 文件(因为磁盘 space),将它们处理成 data.table,从磁盘中删除 csv 文件,然后在继续下几个 tar 文件之前将结果保存为 RDS。
我的函数在串行模式下工作正常,但在并行模式下它会在内核之间混淆文件。这是为什么?
一些示例数据:
for(j in 1:5){
for(i in 1:5){
a<-df[sample(x = 1:nrow(df), size = 50, replace = TRUE),]
write.csv(a,paste0("seed_",i,".csv"))
lf<-list.files(pattern=".csv")
}
tar(tarfile = paste0("seed_",j,".tar"),files = lf,compression = c("xz"), tar="tar")
}
使用 foreach 的示例代码
require(dplyr)
require(tidyr)
require(foreach)
require(doParallel)
require(magrittr)
#List all tar files in directory
list_of_files<-list.files(pattern = ".tar")
packsINeed<-c("vroom","magrittr","dplyr","tidyr","doParallel")
#Start for loop
myCluster<-makeCluster(6,type="PSOCK")
registerDoParallel(myCluster)
foreach(i= 1:NROW(list_of_files),.packages = packsINeed)%dopar%{
print(paste(list_of_files[i], "which is", i, "of", NROW(list_of_files) ))
print("2. Untar .csv files inside")
untar(tarfile = list_of_files[i], exdir = "tempOutputFiles")
print("#3. Read in files and add up two columns")
df<-vroom::vroom(list.files("tempOutputFiles/$.csv"), id="path")
df$A<-df$B+df$C
print("#4. save RDS")
saveRDS(object = df, file = paste0(tools::file_path_sans_ext(list_of_files[i], compression = TRUE),".rds"))
print("#5. Clean up files")
.files<-list.files("tempOutputFiles",pattern=".csv")
file.remove(basename(.files))
}
使用 mclapply - 行为相同
require(dplyr)
require(tidyr)
require(foreach)
require(doParallel)
require(magrittr)
#List all tar files in directory
list_of_files<-list.files(pattern = ".tar")
myParFun<-fun(文件名){
print(paste(filename))
print("2. Untar all .csv files inside")
untar(tarfile = filename, exdir = "tempOutputFiles")
print("#3. Read in files and add up two columns")
df<-vroom::vroom(list.files("tempOutputFiles/$.csv"), id="path")
df$A<-df$B+df$C
print("#4. save RDS")
saveRDS(object = df, file = paste0(tools::file_path_sans_ext(filename, compression = TRUE),".rds"))
print("#5. Clean up files")
.files<-list.files("tempOutputFiles",pattern=".csv")
file.remove(.files)
}
mclapply(FUN=myParFun, list_of_files, mc.cores=4)
根据 Waldi 的评论,我为 list_of_files 中的每个文件创建了一个目录,现在可以正常工作了。但是有打呼噜的方法吗?例如使用 tempdir?
如评论中所建议,下面的代码为每个进程/tar 文件创建一个目录,untars,将 CSV 合并到 .rds 文件中并删除它们。
请注意,似乎 vroom
需要 altrep = FALSE
参数来避免在删除时出现 permission denied error。
# Generate sample tars for test
write.csv(mtcars,'file1.csv')
write.csv(mtcars,'file2.csv')
write.csv(iris,'file3.csv')
write.csv(iris,'file4.csv')
tar('tar1.tar',files=c('file1.csv','file2.csv'),tar="tar")
tar('tar2.tar',files=c('file3.csv','file4.csv'),tar="tar")
require(dplyr)
require(tidyr)
require(foreach)
require(doParallel)
require(magrittr)
#List all tar files in directory
list_of_files<-list.files(pattern = "\.tar")
packsINeed<-c("vroom","magrittr","dplyr","tidyr","doParallel")
#Start for loop
myCluster<-makeCluster(2,type="PSOCK")
registerDoParallel(myCluster)
foreach(i= 1:NROW(list_of_files),.packages = packsINeed)%dopar%{
print(paste(list_of_files[i], "which is", i, "of", NROW(list_of_files) ))
print("2. Untar .csv files inside")
fileout <- tools::file_path_sans_ext(list_of_files[i], compression = TRUE)
exdir <- paste0("temp",fileout)
untar(tarfile = list_of_files[i], exdir = exdir)
print("#3. Read in files and add up two columns")
df<-vroom::vroom(file.path(exdir,dir(exdir,"*.csv")),altrep = FALSE)
# df$A<-df$B+df$C # These columns don't exist in mtcars used as example
print("#4. save RDS")
saveRDS(object = df, file = file.path(exdir,paste0(fileout,".rds")))
print("#5. Clean up files")
.files<-list.files(exdir,pattern="\.csv")
file.remove(file.path(exdir,.files))
}
不确定.rds应该去哪里,所以暂时放在临时文件夹里。
我有 500 个 tar.xz 文件,其中包含 2000 个 csv 文件。我需要一次取消 tar 几个 tar 文件(因为磁盘 space),将它们处理成 data.table,从磁盘中删除 csv 文件,然后在继续下几个 tar 文件之前将结果保存为 RDS。
我的函数在串行模式下工作正常,但在并行模式下它会在内核之间混淆文件。这是为什么?
一些示例数据:
for(j in 1:5){
for(i in 1:5){
a<-df[sample(x = 1:nrow(df), size = 50, replace = TRUE),]
write.csv(a,paste0("seed_",i,".csv"))
lf<-list.files(pattern=".csv")
}
tar(tarfile = paste0("seed_",j,".tar"),files = lf,compression = c("xz"), tar="tar")
}
使用 foreach 的示例代码
require(dplyr)
require(tidyr)
require(foreach)
require(doParallel)
require(magrittr)
#List all tar files in directory
list_of_files<-list.files(pattern = ".tar")
packsINeed<-c("vroom","magrittr","dplyr","tidyr","doParallel")
#Start for loop
myCluster<-makeCluster(6,type="PSOCK")
registerDoParallel(myCluster)
foreach(i= 1:NROW(list_of_files),.packages = packsINeed)%dopar%{
print(paste(list_of_files[i], "which is", i, "of", NROW(list_of_files) ))
print("2. Untar .csv files inside")
untar(tarfile = list_of_files[i], exdir = "tempOutputFiles")
print("#3. Read in files and add up two columns")
df<-vroom::vroom(list.files("tempOutputFiles/$.csv"), id="path")
df$A<-df$B+df$C
print("#4. save RDS")
saveRDS(object = df, file = paste0(tools::file_path_sans_ext(list_of_files[i], compression = TRUE),".rds"))
print("#5. Clean up files")
.files<-list.files("tempOutputFiles",pattern=".csv")
file.remove(basename(.files))
}
使用 mclapply - 行为相同
require(dplyr)
require(tidyr)
require(foreach)
require(doParallel)
require(magrittr)
#List all tar files in directory
list_of_files<-list.files(pattern = ".tar")
myParFun<-fun(文件名){
print(paste(filename))
print("2. Untar all .csv files inside")
untar(tarfile = filename, exdir = "tempOutputFiles")
print("#3. Read in files and add up two columns")
df<-vroom::vroom(list.files("tempOutputFiles/$.csv"), id="path")
df$A<-df$B+df$C
print("#4. save RDS")
saveRDS(object = df, file = paste0(tools::file_path_sans_ext(filename, compression = TRUE),".rds"))
print("#5. Clean up files")
.files<-list.files("tempOutputFiles",pattern=".csv")
file.remove(.files)
}
mclapply(FUN=myParFun, list_of_files, mc.cores=4)
根据 Waldi 的评论,我为 list_of_files 中的每个文件创建了一个目录,现在可以正常工作了。但是有打呼噜的方法吗?例如使用 tempdir?
如评论中所建议,下面的代码为每个进程/tar 文件创建一个目录,untars,将 CSV 合并到 .rds 文件中并删除它们。
请注意,似乎 vroom
需要 altrep = FALSE
参数来避免在删除时出现 permission denied error。
# Generate sample tars for test
write.csv(mtcars,'file1.csv')
write.csv(mtcars,'file2.csv')
write.csv(iris,'file3.csv')
write.csv(iris,'file4.csv')
tar('tar1.tar',files=c('file1.csv','file2.csv'),tar="tar")
tar('tar2.tar',files=c('file3.csv','file4.csv'),tar="tar")
require(dplyr)
require(tidyr)
require(foreach)
require(doParallel)
require(magrittr)
#List all tar files in directory
list_of_files<-list.files(pattern = "\.tar")
packsINeed<-c("vroom","magrittr","dplyr","tidyr","doParallel")
#Start for loop
myCluster<-makeCluster(2,type="PSOCK")
registerDoParallel(myCluster)
foreach(i= 1:NROW(list_of_files),.packages = packsINeed)%dopar%{
print(paste(list_of_files[i], "which is", i, "of", NROW(list_of_files) ))
print("2. Untar .csv files inside")
fileout <- tools::file_path_sans_ext(list_of_files[i], compression = TRUE)
exdir <- paste0("temp",fileout)
untar(tarfile = list_of_files[i], exdir = exdir)
print("#3. Read in files and add up two columns")
df<-vroom::vroom(file.path(exdir,dir(exdir,"*.csv")),altrep = FALSE)
# df$A<-df$B+df$C # These columns don't exist in mtcars used as example
print("#4. save RDS")
saveRDS(object = df, file = file.path(exdir,paste0(fileout,".rds")))
print("#5. Clean up files")
.files<-list.files(exdir,pattern="\.csv")
file.remove(file.path(exdir,.files))
}
不确定.rds应该去哪里,所以暂时放在临时文件夹里。