在 Apache Beam 中使用 BigQuery 处理空的 PCollections
Handling empty PCollections with BigQuery in Apache Beam
使用以下代码,我在尝试写入 BigQuery 时遇到以下错误
我正在使用 Apache-Beam 2.0.0
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
如果我将 text.startsWith
更改为 D
,一切正常(即有输出)。
有什么办法可以捕获或监视空的 PCollections 吗?
根据 StackTrace,错误实际上出在 BigQueryIO 中 - 我存储桶中的文件有 0 个字节,这可能导致 BigQueryIO 出现问题。
我的用例是我正在使用 DeadLetters 的辅助输出,当我的工作没有产生死信输出时遇到了这个错误,因此稳健地处理这个问题会很有用。
这项工作应该真的能够 运行 在批处理或流模式下,我最好的猜测是以批处理模式将任何输出写入 GCS / TextIO,并在流式传输时将任何输出写入 GBQ,这听起来合理吗?
感谢收到的任何帮助。
public class EmptyPCollection {
public static void main(String [] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation("gs://<your-bucket-here>/temp");
Pipeline pipeline = Pipeline.create(options);
String schema = "{\"fields\": [{\"name\": \"pet\", \"type\": \"string\", \"mode\": \"required\"}]}";
String table = "<your-dataset>.<your-table>";
List<String> pets = Arrays.asList("Dog", "Cat", "Goldfish");
PCollection<String> inputText = pipeline.apply(Create.of(pets)).setCoder(StringUtf8Coder.of());
PCollection<TableRow> rows = inputText.apply(ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
String text = c.element();
if (text.startsWith("X")) { // change to (D)og and works fine
TableRow row = new TableRow();
row.set("pet", text);
c.output(row);
}
}
}));
rows.apply(BigQueryIO.writeTableRows().to(table).withJsonSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
pipeline.run().waitUntilFinish();
}
}
[direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://<your-bucket>/temp/BigQueryWriteTemp/05c7a7c0786a4656abad97f11ef23d8e/2675e1c7-f4d7-4f78-a85f-a38095b57e6b.
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at EmptyPCollection.main(EmptyPCollection.java:54)
Caused by: java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:97)
这看起来像是 Apache Beam 中 BigQuery 接收器实现中的错误。在 Apache Beam Jira 中提交错误将是提交此文件的合适位置。
我已提交 https://issues.apache.org/jira/browse/BEAM-2406 以跟踪此问题。
使用以下代码,我在尝试写入 BigQuery 时遇到以下错误
我正在使用 Apache-Beam 2.0.0
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
如果我将 text.startsWith
更改为 D
,一切正常(即有输出)。
有什么办法可以捕获或监视空的 PCollections 吗?
根据 StackTrace,错误实际上出在 BigQueryIO 中 - 我存储桶中的文件有 0 个字节,这可能导致 BigQueryIO 出现问题。
我的用例是我正在使用 DeadLetters 的辅助输出,当我的工作没有产生死信输出时遇到了这个错误,因此稳健地处理这个问题会很有用。
这项工作应该真的能够 运行 在批处理或流模式下,我最好的猜测是以批处理模式将任何输出写入 GCS / TextIO,并在流式传输时将任何输出写入 GBQ,这听起来合理吗?
感谢收到的任何帮助。
public class EmptyPCollection {
public static void main(String [] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation("gs://<your-bucket-here>/temp");
Pipeline pipeline = Pipeline.create(options);
String schema = "{\"fields\": [{\"name\": \"pet\", \"type\": \"string\", \"mode\": \"required\"}]}";
String table = "<your-dataset>.<your-table>";
List<String> pets = Arrays.asList("Dog", "Cat", "Goldfish");
PCollection<String> inputText = pipeline.apply(Create.of(pets)).setCoder(StringUtf8Coder.of());
PCollection<TableRow> rows = inputText.apply(ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
String text = c.element();
if (text.startsWith("X")) { // change to (D)og and works fine
TableRow row = new TableRow();
row.set("pet", text);
c.output(row);
}
}
}));
rows.apply(BigQueryIO.writeTableRows().to(table).withJsonSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
pipeline.run().waitUntilFinish();
}
}
[direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://<your-bucket>/temp/BigQueryWriteTemp/05c7a7c0786a4656abad97f11ef23d8e/2675e1c7-f4d7-4f78-a85f-a38095b57e6b.
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at EmptyPCollection.main(EmptyPCollection.java:54)
Caused by: java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:97)
这看起来像是 Apache Beam 中 BigQuery 接收器实现中的错误。在 Apache Beam Jira 中提交错误将是提交此文件的合适位置。
我已提交 https://issues.apache.org/jira/browse/BEAM-2406 以跟踪此问题。