如果 Google Cloud Dataflow 中不存在,则使用 TextIO.Write 创建新存储桶
Using TextIO.Write to create new bucket if isn't exist in Google Cloud Dataflow
我正在尝试检查在 ParDo 中创建的文件是否不同于 GCS 中存储的文件。
为此,我尝试阅读文件并比较它们的差异。
Pipeline p = Pipeline.create(c.getPipelineOptions());
try {
PCollection<String> lines = p.apply(
TextIO.Read
.named("Read Section on GS")
.from("gs://failbucket/foo/boo/ret.txt"))
.apply(ParDo
.of(new Util.viewDifferences2(c.element))
.named("only different"));
lines.apply(
TextIO.Write.named("Write Document Different")
.to(pathGS)
.withSuffix(".json"));
p.run();
} catch (Exception e) {
p = Pipeline.create(c.getPipelineOptions());
PCollection<String> lines = p.apply(Create.of(sectionContent));
lines.apply(TextIO.Write.named("Write new Document")
.to("gs://failbucket/foo/boo/ret").withSuffix(".txt"));
p.run();
}
最初文件不存在,所以出现异常,但是当尝试创建时出现此消息 "Output path does not exist or is not writeable"
你知道我如何创建全新路径吗?
谢谢
您可以只使用选项 withoutValidation 使验证无效,但它会在 gs 中创建相应的存储桶。但是如果父桶不存在就会抛出异常
在您的情况下,如果 "failbucket" 存储桶不存在,它将抛出以下错误。
Caused by: java.io.IOException: Failed to write to GCS path gs://failbucket/foo/boo/ret/xxx.txt
但是,如果 "failbucket" 存储桶存在于您的 gs 项目中,那么如果这些存储桶不存在,它将创建 foo/boo/ 个存储桶。
在您的情况下,如果 "failbucket" 存储桶存在于您的 gs
中,则以下应该没问题
lines.apply(TextIO.Write.named("Write new Document")
.to("gs://failbucket/foo/boo/ret")
.withoutValidation()
.withSuffix(".txt"));
您的异常处理代码似乎正在提交一个数据流管道,其唯一目的是在 Google 云存储中创建一个空文件。
这不是特别有效。相反,您可以直接使用 Google Cloud Storage API 与您的 GCS 存储桶进行交互。 API 就此目的而言更加高效和全面。例如,您可以在启动主数据流管道之前使用此 API。
另一种方法是试用 gsutil
工具。此命令行工具具有与 GCS 存储桶交互的类似功能。您也可以在 Java 程序中调用它,或者在启动 Java 程序之前单独调用它。
通常不鼓励在 Dataflow 中禁用 TextIO
验证的方法。这种验证可以提供在 Cloud Platform 中开始执行管道之前快速和及早发现错误的好处。也就是说,在提交作业时无法验证先决条件的极少数情况下,应禁用验证。
我正在尝试检查在 ParDo 中创建的文件是否不同于 GCS 中存储的文件。
为此,我尝试阅读文件并比较它们的差异。
Pipeline p = Pipeline.create(c.getPipelineOptions());
try {
PCollection<String> lines = p.apply(
TextIO.Read
.named("Read Section on GS")
.from("gs://failbucket/foo/boo/ret.txt"))
.apply(ParDo
.of(new Util.viewDifferences2(c.element))
.named("only different"));
lines.apply(
TextIO.Write.named("Write Document Different")
.to(pathGS)
.withSuffix(".json"));
p.run();
} catch (Exception e) {
p = Pipeline.create(c.getPipelineOptions());
PCollection<String> lines = p.apply(Create.of(sectionContent));
lines.apply(TextIO.Write.named("Write new Document")
.to("gs://failbucket/foo/boo/ret").withSuffix(".txt"));
p.run();
}
最初文件不存在,所以出现异常,但是当尝试创建时出现此消息 "Output path does not exist or is not writeable"
你知道我如何创建全新路径吗?
谢谢
您可以只使用选项 withoutValidation 使验证无效,但它会在 gs 中创建相应的存储桶。但是如果父桶不存在就会抛出异常
在您的情况下,如果 "failbucket" 存储桶不存在,它将抛出以下错误。
Caused by: java.io.IOException: Failed to write to GCS path gs://failbucket/foo/boo/ret/xxx.txt
但是,如果 "failbucket" 存储桶存在于您的 gs 项目中,那么如果这些存储桶不存在,它将创建 foo/boo/ 个存储桶。
在您的情况下,如果 "failbucket" 存储桶存在于您的 gs
中,则以下应该没问题 lines.apply(TextIO.Write.named("Write new Document")
.to("gs://failbucket/foo/boo/ret")
.withoutValidation()
.withSuffix(".txt"));
您的异常处理代码似乎正在提交一个数据流管道,其唯一目的是在 Google 云存储中创建一个空文件。
这不是特别有效。相反,您可以直接使用 Google Cloud Storage API 与您的 GCS 存储桶进行交互。 API 就此目的而言更加高效和全面。例如,您可以在启动主数据流管道之前使用此 API。
另一种方法是试用 gsutil
工具。此命令行工具具有与 GCS 存储桶交互的类似功能。您也可以在 Java 程序中调用它,或者在启动 Java 程序之前单独调用它。
通常不鼓励在 Dataflow 中禁用 TextIO
验证的方法。这种验证可以提供在 Cloud Platform 中开始执行管道之前快速和及早发现错误的好处。也就是说,在提交作业时无法验证先决条件的极少数情况下,应禁用验证。