Apache Beam 中的顺序执行 - Java SDK 2.18.0

Sequential Execution in Apache Beam - Java SDK 2.18.0

您好,我有几个问题想 运行 并使用 Apache Beam 依次保存结果,我见过一些类似的问题,但找不到答案。我习惯于使用 Airflow 设计管道,但我对 Apache Beam 还是个新手。我正在使用 Dataflow 运行ner。这是我现在的代码:只有在 query1 结果保存到相应的 table 之后,我才希望 query2 到 运行。我如何链接它们?

    PCollection<TableRow> resultsStep1 = getData("Run Query 1",
            "Select * FROM basetable");

    resultsStep1.apply("Save Query1 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep1.getSchema()))
                    .to("resultsStep1")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

    PCollection<TableRow> resultsStep2 = getData("Run Query 2",
            "Select * FROM resultsStep1");

    resultsStep2.apply("Save Query2 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep2.getSchema()))
                    .to("resultsStep2")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

这是我的 getData 函数定义:

private PCollection<TableRow> getData(final String taskName, final String query) {
    return pipeline.apply(taskName,
            BigQueryIO.readTableRowsWithSchema()
                    .fromQuery(query)
                    .usingStandardSql()
                    .withCoder(TableRowJsonCoder.of()));
}

编辑(更新): 结果: You can’t sequence the completion of a BigQuery write with other steps of your pipeline.

我认为这是设计管道的一个很大限制。 资料来源:https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations

您可以使用 Wait 方法来完成此操作。下面是一个人为的例子

 PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
 data.apply(Wait.on(firstWriteResults))
     // Windows of this intermediate PCollection will be processed no earlier than when
     // the respective window of firstWriteResults closes.
     .apply(ParDo.of(...write to second database...));

您可以在此处的 API 文档中找到更多详细信息 - https://beam.apache.org/releases/javadoc/2.17.0/index.html?org/apache/beam/sdk/transforms/Wait.html