Dataflow/Beam 的 SDK 2.0.0 空侧输出抛出 NPE

Empty side outputs throwing NPE on SDK 2.0.0 for Dataflow/Beam

我们正在尝试将 Dataflow/Beam 管道从 2.0.0-beta3 迁移到 2.0.0

然而,当我们使用 2.0.0 版本时,管道失败并在 Dataflow/Beam API 深处出现 NPE。改回 2.0.0-beta3,它再次正常工作。

对代码所做的唯一更改是合并 2.0.0 SDK 的 API 更改。我们没有改变任何其他东西。问题似乎出在侧面输出为空时。空侧输出在 2.0.0-beta3.

上工作正常

我们迁移到 2.0.0 时是否做错了什么?

这是一个重现该问题的示例。 运行 具有以下参数:

--project=<project-id> 
--runner=DirectRunner 
--tempLocation=gs://<your-bucket> 
--stagingLocation=gs://<your-bucket>

2.0.0-beta3(运行良好)

public class EmptySideOutputNPE implements Serializable {
    private static final TupleTag<TableRow> mainOutputTag = new TupleTag<TableRow>("mainOutputTag") {
    };
    private static final TupleTag<TableRow> sideOutputTag = new TupleTag<TableRow>("sideOutputTag") {
    };
    private static final TupleTag<TableRow> possibleEmptySideOutputTag = new TupleTag<TableRow>("possibleEmptySideOutputTag") {
    };

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(PipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);
        //Read from BigQuery public dataset
        PCollectionTuple results = pipeline.apply("Read-BQ", BigQueryIO.Read.from("bigquery-samples:wikipedia_benchmark.Wiki1k"))
                .apply(ParDo.of(new DoFn<TableRow, TableRow>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        TableRow inputRow = c.element();
                        //output the title to main output tag
                        TableRow titleRow = new TableRow();
                        titleRow.set("col", inputRow.get("title"));
                        c.output(titleRow);

                        //output the language to the side output
                        TableRow languageRow = new TableRow();
                        languageRow.set("col", inputRow.get("language"));
                        c.sideOutput(sideOutputTag, languageRow);

                        //don' output anything for the possibleEmptySideOutputTag tag
                    }
                }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag).and(possibleEmptySideOutputTag)));
        //write the results:
        results.get(mainOutputTag).apply("Title write",
                BigQueryIO.Write.to("<project-id>:<dataset>.2_0_0_sdk_test_title")
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withSchema(getTableSchema()));
        results.get(sideOutputTag).apply("Language write",
                BigQueryIO.Write.to("<project-id>:<dataset>.2_0_0_sdk_test_language")
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withSchema(getTableSchema()));
        results.get(possibleEmptySideOutputTag).apply("Empty write",
                BigQueryIO.Write.to("<project-id>:<dataset>.2_0_0_sdk_test_empty")
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withSchema(getTableSchema()));
        pipeline.run();
    }

    private static TableSchema getTableSchema() {
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("col").setType("STRING"));
        return new TableSchema().setFields(fields);
    }
}

2.0.0 (NPE)

public class EmptySideOutputNPE implements Serializable {
    private static final TupleTag<TableRow> mainOutputTag = new TupleTag<TableRow>("mainOutputTag") {
    };
    private static final TupleTag<TableRow> sideOutputTag = new TupleTag<TableRow>("sideOutputTag") {
    };
    private static final TupleTag<TableRow> possibleEmptySideOutputTag = new TupleTag<TableRow>("possibleEmptySideOutputTag") {
    };

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(PipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);
        //Read from BigQuery public dataset
        PCollectionTuple results = pipeline.apply("Read-BQ", BigQueryIO.read().from("bigquery-samples:wikipedia_benchmark.Wiki1k"))
                .apply(ParDo.of(new DoFn<TableRow, TableRow>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        TableRow inputRow = c.element();
                        //output the title to main output tag
                        TableRow titleRow = new TableRow();
                        titleRow.set("col", inputRow.get("title"));
                        c.output(titleRow);

                        //output the language to the side output
                        TableRow languageRow = new TableRow();
                        languageRow.set("col", inputRow.get("language"));
                        c.output(sideOutputTag, languageRow);

                        //don' output anything for the possibleEmptySideOutputTag tag
                    }
                }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag).and(possibleEmptySideOutputTag)));
        //write the results:
        results.get(mainOutputTag).apply("Title write",
                BigQueryIO.writeTableRows().to("<project-id>:<dataset>.2_0_0_sdk_test_title")
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withSchema(getTableSchema()));
        results.get(sideOutputTag).apply("Language write",
                BigQueryIO.writeTableRows().to("<project-id>:<dataset>.2_0_0_sdk_test_language")
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withSchema(getTableSchema()));
        results.get(possibleEmptySideOutputTag).apply("Empty write",
                BigQueryIO.writeTableRows().to("<project-id>:<dataset>.2_0_0_sdk_test_empty")
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withSchema(getTableSchema()));
        pipeline.run();
    }

    private static TableSchema getTableSchema() {
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("col").setType("STRING"));
        return new TableSchema().setFields(fields);
    }
}

23:43:09,484 0    [main] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase - Starting BigQuery extract job: beam_job_885a1329f1a045d6a6422c975690967e_emptysideoutputnpepolleyg0715134309b6259542-extract
    23:43:11,209 1725 [main] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Started BigQuery job: {jobId=beam_job_885a1329f1a045d6a6422c975690967e_emptysideoutputnpepolleyg0715134309b6259542-extract, projectId=<redacted>}.
    bq show -j --format=prettyjson --project_id=<redacted> beam_job_885a1329f1a045d6a6422c975690967e_emptysideoutputnpepolleyg0715134309b6259542-extract
    23:43:12,718 3234 [main] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase - BigQuery extract job completed: beam_job_885a1329f1a045d6a6422c975690967e_emptysideoutputnpepolleyg0715134309b6259542-extract
    23:43:14,738 5254 [direct-runner-worker] INFO  org.apache.beam.sdk.io.FileBasedSource - Matched 1 files for pattern gs://nonsense/BigQueryExtractTemp/885a1329f1a045d6a6422c975690967e/000000000000.avro
    23:43:18,171 8687 [direct-runner-worker] INFO  org.apache.beam.sdk.io.FileBasedSource - Filepattern gs://nonsense/BigQueryExtractTemp/885a1329f1a045d6a6422c975690967e/000000000000.avro matched 1 files with total size 60370
    23:43:18,653 9169 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/956c7d7b866941aaa406bd9e5cb63aab/399d59ec-2475-4d07-9fa9-25feadf53737.
    23:43:18,653 9169 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/4377160da6184249a5ffc7cc27155265/8db1d8c4-9e4d-4093-8b9f-3e892de78057.
    23:43:22,839 13355 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/956c7d7b866941aaa406bd9e5cb63aab/1b544d4b-650c-4e05-abc0-f80318278a2f.
    23:43:22,849 13365 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/4377160da6184249a5ffc7cc27155265/2f3164e0-674e-4926-925f-678657587e75.
    23:43:27,428 17944 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/4377160da6184249a5ffc7cc27155265/b0d8ae7a-e6b0-48ac-a0a1-fd3e0fa17f75.
    23:43:27,434 17950 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/956c7d7b866941aaa406bd9e5cb63aab/b77b17e3-562c-47b0-8a6c-ee8eb7745fc8.
    23:43:33,242 23758 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://nonsense/BigQueryWriteTemp/1f559dd752eb43f7bd1af1c881c21235/a8e51a20-408d-4628-abf3-bbdb2ebd9527.
    23:43:35,046 25562 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Started BigQuery job: {jobId=956c7d7b866941aaa406bd9e5cb63aab_e9f0a5890698d99399a6106c26d65de2_00001-0, projectId=<redacted>}.
    bq show -j --format=prettyjson --project_id=<redacted> 956c7d7b866941aaa406bd9e5cb63aab_e9f0a5890698d99399a6106c26d65de2_00001-0
    23:43:35,126 25642 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Started BigQuery job: {jobId=4377160da6184249a5ffc7cc27155265_a6c30233d929e6958a536246c31fe3d1_00001-0, projectId=<redacted>}.
    bq show -j --format=prettyjson --project_id=<redacted> 4377160da6184249a5ffc7cc27155265_a6c30233d929e6958a536246c31fe3d1_00001-0
    Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
        at com.pipelines.EmptySideOutputNPE.main(EmptySideOutputNPE.java:85)
    Caused by: java.lang.NullPointerException
        at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:97)

观察结果:

  1. 从管道中删除 possibleEmptySideOutputTag 时,它在 2.0.0 上运行良好,即 .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
  2. ParDo 中向 possibleEmptySideOutputTag 添加 1+ 行时,它在 2.0.0 上运行良好。

这看起来像 https://issues.apache.org/jira/browse/BEAM-2406 已经修复,并且可以在 HEAD 或即将发布的 2.1.0 版本中获得修复。