Apache Beam TextIO.ReadAll(),处理丢失的文件名?
Apache Beam TextIO.ReadAll(), handle missing filenames?
我有一个管道步骤块,可以从 GCS 存储桶中的特定文件中读取数据。代码类似于以下内容:
List<String> filepaths = new ArrayList<String>("filepath1", "filepath2", "filepathMissing");
return pipeline
.apply("GatherFiles", Create.of(filepaths)).setCoder(StringUtf8Coder.of())
.apply("GatherFileData", TextIO.readAll())
.apply("ApplyCustomDoFn", ParDo.of(new CustomDoFn()))
.apply("Group", GroupByKey.<String, String>create())
.apply("AnotherCustomDoFn", ParDo.of(new AnotherCustomDoFn()));
如果 GCS 中缺少其中一个文件路径(例如,缺少 "filepathMissing"),整个管道就会崩溃。我正在考虑围绕这组管道步骤添加一个 try/catch,但我不确定这样做的后果。
我的问题:
- 使用
TextIO.readAll()
时,try / catch 是否是为可能丢失的文件提供错误处理的正确方法?
- 使用 try / catch,如果无法从 GCS 中找到一个文件,上述整套步骤是否会失败?
如果有关于如何执行此操作的任何具体文档,请 link 附上您的答案:)
我最终找到了上述问题的答案。
我必须在 TextIO.readAll() 之后添加代码 .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW),如下所示。
List<String> filepaths = new ArrayList<String>("filepath1", "filepath2", "filepathMissing");
return pipeline
.apply("GatherFiles", Create.of(filepaths)).setCoder(StringUtf8Coder.of())
.apply("GatherFileData", TextIO.readAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
.apply("ApplyCustomDoFn", ParDo.of(new CustomDoFn()))
.apply("Group", GroupByKey.<String, String>create())
.apply("AnotherCustomDoFn", ParDo.of(new AnotherCustomDoFn()));
此修复的不幸问题是,如果文件丢失,您的管道将在管道步骤输出一条 INFO 消息,但不会在管道外抛出任何错误。如果您希望该文件存在,您可能不知道该文件未包含在内,除非您在数据流中明确检查管道步骤的日志。
我有一个管道步骤块,可以从 GCS 存储桶中的特定文件中读取数据。代码类似于以下内容:
List<String> filepaths = new ArrayList<String>("filepath1", "filepath2", "filepathMissing");
return pipeline
.apply("GatherFiles", Create.of(filepaths)).setCoder(StringUtf8Coder.of())
.apply("GatherFileData", TextIO.readAll())
.apply("ApplyCustomDoFn", ParDo.of(new CustomDoFn()))
.apply("Group", GroupByKey.<String, String>create())
.apply("AnotherCustomDoFn", ParDo.of(new AnotherCustomDoFn()));
如果 GCS 中缺少其中一个文件路径(例如,缺少 "filepathMissing"),整个管道就会崩溃。我正在考虑围绕这组管道步骤添加一个 try/catch,但我不确定这样做的后果。
我的问题:
- 使用
TextIO.readAll()
时,try / catch 是否是为可能丢失的文件提供错误处理的正确方法? - 使用 try / catch,如果无法从 GCS 中找到一个文件,上述整套步骤是否会失败?
如果有关于如何执行此操作的任何具体文档,请 link 附上您的答案:)
我最终找到了上述问题的答案。
我必须在 TextIO.readAll() 之后添加代码 .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW),如下所示。
List<String> filepaths = new ArrayList<String>("filepath1", "filepath2", "filepathMissing");
return pipeline
.apply("GatherFiles", Create.of(filepaths)).setCoder(StringUtf8Coder.of())
.apply("GatherFileData", TextIO.readAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
.apply("ApplyCustomDoFn", ParDo.of(new CustomDoFn()))
.apply("Group", GroupByKey.<String, String>create())
.apply("AnotherCustomDoFn", ParDo.of(new AnotherCustomDoFn()));
此修复的不幸问题是,如果文件丢失,您的管道将在管道步骤输出一条 INFO 消息,但不会在管道外抛出任何错误。如果您希望该文件存在,您可能不知道该文件未包含在内,除非您在数据流中明确检查管道步骤的日志。