如何在不解压的情况下读取保存在 apache beam 云存储中的压缩 gzip csv 文件
How to read zipped gzip csv files saved in cloud storage in apache beam without extracting
我通过第三方将 GZIP 压缩的 csv 文件上传到云存储中。我正在使用 apache beam 连续流式传输管道来读取压缩文件元数据(文件名、完整路径)。我还有一个要求要读取此 csv 文件的第一行和最后一行。
我正在使用以下代码获取所有正在添加到云存储桶文件夹中的压缩文件。
pipeline.apply("MatchFile(s)", FileIO.match()
.filepattern(zipFilePath)
.continuously(Duration.standardMinutes(1), Watch.Growth.never()))
.apply(Window.<~>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Get Compressed File(s)", ParDo.of(new GetCompressedFile()));
static class GetCompressedFile extends DoFn<MatchResult.Metadata, Void> {
@ProcessElement
public void processElement(ProcessContext context) throws ParseException {
ResourceId inputFile = context.element().resourceId();
String fileName = inputFile.getFilename();
String currentDirectoryPath = inputFile.getCurrentDirectory().toString();
我能够获得压缩文件名和路径,但我无法在不解压的情况下读取 csv 文件。我尝试了一些 google 答案来读取压缩文件,但不是从云存储中读取。
我不知道这是不是你想要的,但你可以这样阅读你的文件
pipeline.apply("MatchFile(s)", FileIO.match()
.filepattern(zipFilePath)
.continuously(Duration.standardMinutes(1), Watch.Growth.never()))
.apply(Window.<~>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(FileIO.readMatches().withCompression(Compression.GZIP))
.apply(ParDo.of(new DoFn<FileIO.ReadableFile, IdLine>() {
@ProcessElement
public void processElement(@Element FileIO.ReadableFile file, OutputReceiver<IdLine> out) throws IOException {
String content = file.readFullyAsUTF8String();
System.out.println(content);
BufferedReader br = new BufferedReader(Channels.newReader(file.open(), Charset.defaultCharset()));
String l;
StringBuffer stackLine = new StringBuffer();
while ((l = br.readLine()) != null) {
....
}
}
}));
我可以使用以下代码读取压缩文件而无需解压缩。可能会对某人有所帮助。
pipeline.apply("MatchFile(s)", FileIO.match()
.filepattern(zipFilePath)
.continuously(Duration.standardMinutes(1), Watch.Growth.never()))
.apply(Window.<MatchResult.Metadata>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(FileIO.readMatches().withCompression(GZIP))
.apply("Read Files",ParDo.of(new ReadFilesGZIP()));
pipeline.run();
}
static class ReadsFilesGZIP extends DoFn<FileIO.ReadableFile,String>{
@ProcessElement
public void processElement(ProcessContext context){
FileIO.ReadableFile file = context.element();
ReadableByteChannel readableByteChannel = file.getCompression().readDecompressed(FileSystems.open(file.getMetadata().resourceId()));
try (BufferedReader r = new BufferedReader(new InputStreamReader(Channels.newInputStream(readableByteChannel)))) {
String line;
Stream<String> fileLines = r.lines();
}
我通过第三方将 GZIP 压缩的 csv 文件上传到云存储中。我正在使用 apache beam 连续流式传输管道来读取压缩文件元数据(文件名、完整路径)。我还有一个要求要读取此 csv 文件的第一行和最后一行。 我正在使用以下代码获取所有正在添加到云存储桶文件夹中的压缩文件。
pipeline.apply("MatchFile(s)", FileIO.match()
.filepattern(zipFilePath)
.continuously(Duration.standardMinutes(1), Watch.Growth.never()))
.apply(Window.<~>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Get Compressed File(s)", ParDo.of(new GetCompressedFile()));
static class GetCompressedFile extends DoFn<MatchResult.Metadata, Void> {
@ProcessElement
public void processElement(ProcessContext context) throws ParseException {
ResourceId inputFile = context.element().resourceId();
String fileName = inputFile.getFilename();
String currentDirectoryPath = inputFile.getCurrentDirectory().toString();
我能够获得压缩文件名和路径,但我无法在不解压的情况下读取 csv 文件。我尝试了一些 google 答案来读取压缩文件,但不是从云存储中读取。
我不知道这是不是你想要的,但你可以这样阅读你的文件
pipeline.apply("MatchFile(s)", FileIO.match()
.filepattern(zipFilePath)
.continuously(Duration.standardMinutes(1), Watch.Growth.never()))
.apply(Window.<~>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(FileIO.readMatches().withCompression(Compression.GZIP))
.apply(ParDo.of(new DoFn<FileIO.ReadableFile, IdLine>() {
@ProcessElement
public void processElement(@Element FileIO.ReadableFile file, OutputReceiver<IdLine> out) throws IOException {
String content = file.readFullyAsUTF8String();
System.out.println(content);
BufferedReader br = new BufferedReader(Channels.newReader(file.open(), Charset.defaultCharset()));
String l;
StringBuffer stackLine = new StringBuffer();
while ((l = br.readLine()) != null) {
....
}
}
}));
我可以使用以下代码读取压缩文件而无需解压缩。可能会对某人有所帮助。
pipeline.apply("MatchFile(s)", FileIO.match()
.filepattern(zipFilePath)
.continuously(Duration.standardMinutes(1), Watch.Growth.never()))
.apply(Window.<MatchResult.Metadata>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(FileIO.readMatches().withCompression(GZIP))
.apply("Read Files",ParDo.of(new ReadFilesGZIP()));
pipeline.run();
}
static class ReadsFilesGZIP extends DoFn<FileIO.ReadableFile,String>{
@ProcessElement
public void processElement(ProcessContext context){
FileIO.ReadableFile file = context.element();
ReadableByteChannel readableByteChannel = file.getCompression().readDecompressed(FileSystems.open(file.getMetadata().resourceId()));
try (BufferedReader r = new BufferedReader(new InputStreamReader(Channels.newInputStream(readableByteChannel)))) {
String line;
Stream<String> fileLines = r.lines();
}