从 Spark 中的多个 .gz 文件中读取特定文件
Read specific file from multiple .gz file in Spark
我正在尝试读取一个文件夹中多个 .gz 文件中存在的具有特定名称的文件。
例如
D:/sample_datasets/gzfiles
|-my_file_1.tar.gz
|-my_file_1.tar
|-file1.csv
|-file2.csv
|-file3.csv
|-my_file_2.tar.gz
|-my_file_2.tar
|-file1.csv
|-file2.csv
|-file3.csv
我只对阅读 file1.csv
的内容感兴趣,它在所有 .gz
文件中具有相同的架构。
我正在传递路径 D:/sample_datasets/gzfiles
到 JavaSparkContext
中的 wholeTextFiles()
方法。但是,它 returns tar 即所有文件的内容。 file1.csv, file2.csv, file3.csv.
有没有办法只能读取Dataset或RDD中file1.csv
的内容。提前致谢!
在路径的末尾使用 *.gz
。
希望对您有所帮助!
我能够使用我在 SO
上的多个答案中使用的以下片段来执行该过程
JavaPairRDD tarData = sparkContext.binaryFiles("D:/sample_datasets/gzfiles/*.tar.gz");
JavaRDD tarRecords = tarData.flatMap(new FlatMapFunction, Row>(){
private static final long serialVersionUID = 1L;
@Override
public Iterator call(Tuple2 t) throws Exception {
TsvParserSettings settings = new TsvParserSettings();
TsvParser parser = new TsvParser(settings);
List records = new ArrayList();
TarArchiveInputStream tarInput = new TarArchiveInputStream(new GzipCompressorInputStream(t._2.open()));
TarArchiveEntry entry;
while((entry = tarInput.getNextTarEntry()) != null) {
if(entry.getName().equals("file1.csv")) {
InputStreamReader streamReader = new InputStreamReader(tarInput);
BufferedReader reader = new BufferedReader(streamReader);
String line;
while((line = reader.readLine())!= null) {
String [] parsedLine = parser.parseLine(line);
Row row = RowFactory.create(parsedLine);
records.add(row);
}
reader.close();
break;
}
}
tarInput.close();
return records.iterator();
}
});
我正在尝试读取一个文件夹中多个 .gz 文件中存在的具有特定名称的文件。
例如
D:/sample_datasets/gzfiles
|-my_file_1.tar.gz
|-my_file_1.tar
|-file1.csv
|-file2.csv
|-file3.csv
|-my_file_2.tar.gz
|-my_file_2.tar
|-file1.csv
|-file2.csv
|-file3.csv
我只对阅读 file1.csv
的内容感兴趣,它在所有 .gz
文件中具有相同的架构。
我正在传递路径 D:/sample_datasets/gzfiles
到 JavaSparkContext
中的 wholeTextFiles()
方法。但是,它 returns tar 即所有文件的内容。 file1.csv, file2.csv, file3.csv.
有没有办法只能读取Dataset或RDD中file1.csv
的内容。提前致谢!
在路径的末尾使用 *.gz
。
希望对您有所帮助!
我能够使用我在 SO
上的多个答案中使用的以下片段来执行该过程
JavaPairRDD tarData = sparkContext.binaryFiles("D:/sample_datasets/gzfiles/*.tar.gz");
JavaRDD tarRecords = tarData.flatMap(new FlatMapFunction, Row>(){
private static final long serialVersionUID = 1L;
@Override
public Iterator call(Tuple2 t) throws Exception {
TsvParserSettings settings = new TsvParserSettings();
TsvParser parser = new TsvParser(settings);
List records = new ArrayList();
TarArchiveInputStream tarInput = new TarArchiveInputStream(new GzipCompressorInputStream(t._2.open()));
TarArchiveEntry entry;
while((entry = tarInput.getNextTarEntry()) != null) {
if(entry.getName().equals("file1.csv")) {
InputStreamReader streamReader = new InputStreamReader(tarInput);
BufferedReader reader = new BufferedReader(streamReader);
String line;
while((line = reader.readLine())!= null) {
String [] parsedLine = parser.parseLine(line);
Row row = RowFactory.create(parsedLine);
records.add(row);
}
reader.close();
break;
}
}
tarInput.close();
return records.iterator();
}
});