Dataflow GCP (Apache Beam) - 连续读取大量文件 (OutOfMemory)
Dataflow GCP (Apache Beam) - continuously reading big amount of files (OutOfMemory)
我想做的事情:
- 按模式连续读取并解压GZ文件(~3000个文件),解压后每个文件1.2MB和9MB
- 替换每个 CSV 文件中的一些字符序列
- 将CSV文件压缩到GZ并将修改后的文件保存到自己的路径。
实际代码:
static void run(final BeeswaxDataflowOptions options) {
final Pipeline pipeline = Pipeline.create(options);
final PCollection<MatchResult.Metadata> matches =
pipeline.apply(
"Read",
FileIO.match()
.filepattern(options.getSourcePath() + options.getSourceFilesPattern())
.continuously(
Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));
matches
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(
Window.<FileIO.ReadableFile>into(
FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
.triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1).getContinuationTrigger())))
.apply(
"Uncompress",
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(
file -> {
final String filePath = file.getMetadata().resourceId().toString();
try {
return KV.of(filePath, file.readFullyAsUTF8String());
} catch (final IOException e) {
return KV.of(filePath, "");
}
}))
.apply("Prepare for BigQuery import", ParDo.of(new BigQueryDataPreparatorFn()))
.apply(
"Save results",
FileIO.<String, KV<String, String>>writeDynamic()
.withCompression(GZIP)
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.withNumShards(options.getShards())
.to(options.getOutputPath())
.withTempDirectory(options.getTempLocation())
.withNaming(AbsoluteNaming::new));
pipeline.run().waitUntilFinish();
问题出在 OutOfMemory 异常(是的,我知道 readFullyAsUTF8String 可能对此有疑问)。
遇到这种情况怎么办?
我的观察是在 "Uncompress" 步骤中读取和收集了所有 ~3000 个文件。
所以在进入 "Prepare for BigQuery import" 和 "Save results" 之前,它会以某种方式累积并读取到 RAM。
最好能以某种方式将此管道排队 - 就像最多 50 个元素通过步骤并等待结果然后开始下一步。
这可能吗?不一样怎么处理
您可以在这里做一些事情。
1:使用shuffle更均匀的分配文件。
final PCollection<MatchResult.Metadata> matches =
pipeline.apply(
"Read",
FileIO.match()
.filepattern(options.getSourcePath() + options.getSourceFilesPattern())
.continuously(
Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));
matches
.apply(Reshuffle.viaRandomKey())
.apply(FileIO.readMatches().withCompression(GZIP))
接下来您可以通过设置 --numberOfWorkerHarnessThreads
来限制每个 VM 处理的并发元素,但我认为这个问题应该通过重新洗牌来解决。
我想做的事情:
- 按模式连续读取并解压GZ文件(~3000个文件),解压后每个文件1.2MB和9MB
- 替换每个 CSV 文件中的一些字符序列
- 将CSV文件压缩到GZ并将修改后的文件保存到自己的路径。
实际代码:
static void run(final BeeswaxDataflowOptions options) {
final Pipeline pipeline = Pipeline.create(options);
final PCollection<MatchResult.Metadata> matches =
pipeline.apply(
"Read",
FileIO.match()
.filepattern(options.getSourcePath() + options.getSourceFilesPattern())
.continuously(
Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));
matches
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(
Window.<FileIO.ReadableFile>into(
FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
.triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1).getContinuationTrigger())))
.apply(
"Uncompress",
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(
file -> {
final String filePath = file.getMetadata().resourceId().toString();
try {
return KV.of(filePath, file.readFullyAsUTF8String());
} catch (final IOException e) {
return KV.of(filePath, "");
}
}))
.apply("Prepare for BigQuery import", ParDo.of(new BigQueryDataPreparatorFn()))
.apply(
"Save results",
FileIO.<String, KV<String, String>>writeDynamic()
.withCompression(GZIP)
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.withNumShards(options.getShards())
.to(options.getOutputPath())
.withTempDirectory(options.getTempLocation())
.withNaming(AbsoluteNaming::new));
pipeline.run().waitUntilFinish();
问题出在 OutOfMemory 异常(是的,我知道 readFullyAsUTF8String 可能对此有疑问)。 遇到这种情况怎么办?
我的观察是在 "Uncompress" 步骤中读取和收集了所有 ~3000 个文件。 所以在进入 "Prepare for BigQuery import" 和 "Save results" 之前,它会以某种方式累积并读取到 RAM。
最好能以某种方式将此管道排队 - 就像最多 50 个元素通过步骤并等待结果然后开始下一步。 这可能吗?不一样怎么处理
您可以在这里做一些事情。
1:使用shuffle更均匀的分配文件。
final PCollection<MatchResult.Metadata> matches =
pipeline.apply(
"Read",
FileIO.match()
.filepattern(options.getSourcePath() + options.getSourceFilesPattern())
.continuously(
Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));
matches
.apply(Reshuffle.viaRandomKey())
.apply(FileIO.readMatches().withCompression(GZIP))
接下来您可以通过设置 --numberOfWorkerHarnessThreads
来限制每个 VM 处理的并发元素,但我认为这个问题应该通过重新洗牌来解决。