Google Dataflow 在 BigQuery 中写入多行

Google Dataflow write multiple line in BigQuery

我有一个简单的流程,目的是在一个 BigQuery 中写两行 Table。 我使用 DynamicDestinations 因为在那之后我将写多个 Table,在那个例子中它是相同的 table... 问题是我的 BigQuery table 最后只有 1 行。 它堆栈跟踪我在第二个 insert

上看到以下错误

” 状态: { 代码:6
消息:"Already Exists: Job sampleprojet3:b9912b9b05794aec8f4292b2ae493612_eeb0082ade6f4a58a14753d1cc92ddbc_00001-0"
} “

这是什么意思? 与此限制有关吗? https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/550 我怎样才能完成这项工作?

我用 BeamSDK 2.0.0,我试过 2.1.0(结果相同)

我的启动方式:

mvn compile exec:java -Dexec.mainClass=fr.gireve.dataflow.LogsFlowBug -Dexec.args="--runner=DataflowRunner --inputDir=gs://sampleprojet3.appspot.com / --project=sampleprojet3 --stagingLocation=gs://dataflow-sampleprojet3/tmp" -Pdataflow-runner

    Pipeline p = Pipeline.create(options);

    final List<String> tableNameTableValue = Arrays.asList("table1:value1", "table1:value2", "table2:value1", "table2:value2");

    p.apply(Create.of(tableNameTableValue)).setCoder(StringUtf8Coder.of())
            .apply(BigQueryIO.<String>write()
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .to(new DynamicDestinations<String, KV<String, String>>() {
                @Override
                public KV<String, String> getDestination(ValueInSingleWindow<String> element) {
                    final String[] split = element.getValue().split(":");
                    return KV.of(split[0], split[1]) ;
                }

                @Override
                public Coder<KV<String, String>> getDestinationCoder() {
                    return KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
                }

                @Override
                public TableDestination getTable(KV<String, String> row) {
                    String tableName = row.getKey();
                    String tableSpec = "sampleprojet3:testLoadJSON." + tableName;
                    return new TableDestination(tableSpec, "Table " + tableName);
                }

                @Override
                public TableSchema getSchema(KV<String, String> row) {
                    List<TableFieldSchema> fields = new ArrayList<>();
                    fields.add(new TableFieldSchema().setName("myColumn").setType("STRING"));
                    TableSchema ts = new TableSchema();
                    ts.setFields(fields);
                    return ts;
                }
            })
            .withFormatFunction(new SerializableFunction<String, TableRow>() {
                public TableRow apply(String row) {
                    TableRow tr = new TableRow();
                    tr.set("myColumn", row);
                    return tr;
                }
            }));

    p.run().waitUntilFinish();

谢谢

DynamicDestinations 将每个元素与 destination 相关联 - 即元素应该去的地方。元素根据其目的地路由到 BigQuery tables:1 destination = 1 BigQuery table with a schema: destination should include just enough information to produce a TableDestination 和一个架构。具有相同目的地的元素转到相同的 table,具有不同目的地的元素转到不同的 table。

您的代码片段使用 DynamicDestinations,目标类型包含 元素和 table,这是不必要的,当然也违反了上面的约束:具有不同目的地的元素最终会到达相同的 table:例如KV("table1", "value1")KV("table1", "value2") 是不同的目的地,但您的 getTable 将它们映射到相同的 table table1.

您需要从目标类型中删除该元素。这也将导致更简单的代码。作为旁注,我认为您不需要覆盖 getDestinationCoder() - 它可以自动推断出来。

试试这个:

        .to(new DynamicDestinations<String, String>() {
            @Override
            public String getDestination(ValueInSingleWindow<String> element) {
                return element.getValue().split(":")[0];
            }

            @Override
            public TableDestination getTable(String tableName) {
                return new TableDestination(
                    "sampleprojet3:testLoadJSON." + tableName, "Table " + tableName);
            }

            @Override
            public TableSchema getSchema(String tableName) {
                List<TableFieldSchema> fields = Arrays.asList(
                    new TableFieldSchema().setName("myColumn").setType("STRING"));
                return new TableSchema().setFields(fields);
            }
        })