从 BigQuery 采购时,Beam 管道是否在源查询中保留 ORDER BY?

Do beam pipelines preserve ORDER BY in a source query when sourcing from BigQuery?

我们有一个用 Java 编写的 Beam 管道,我们在 GCP Dataflow 上 运行。它非常简单,它将 SQL 查询作为 PipelineOption 查询,针对 BigQuery 发出 SQL 查询,并为返回数据集中的每一行构造一条消息并将其放入 pubsub 主题。

import com.google.api.services.bigquery.model.TableRow;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@code BigQueryEventReplayer} pipeline runs a supplied SQL query
 * against BigQuery, and sends the results one-by-one to PubSub
 * The query MUST return a column named 'json', it is this column
 * (and ONLY this column) that will be sent onward. The column must be a String type
 * and should be valid JSON.
 */
public class BigQueryEventReplayer {

  private static final Logger logger = LoggerFactory.getLogger(BigQueryEventReplayer.class);

  /**
   * Options for the BigQueryEventReplayer. See descriptions for more info
   */
  public interface Options extends PipelineOptions {
    @Description("SQL query to be run."
        + "An SQL string literal which will be run 'as is'")
    @Required
    ValueProvider<String> getBigQuerySql();

    void setBigQuerySql(ValueProvider<String> value);

    @Description("The name of the topic which data should be published to. "
        + "The name should be in the format of projects/<project-id>/topics/<topic-name>.")
    @Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);

    @Description("The ID of the BigQuery dataset targeted by the event")
    @Required
    ValueProvider<String> getBigQueryTargetDataset();

    void setBigQueryTargetDataset(ValueProvider<String> value);

    @Description("The ID of the BigQuery table targeted by the event")
    @Required
    ValueProvider<String> getBigQueryTargetTable();

    void setBigQueryTargetTable(ValueProvider<String> value);

    @Description("The SourceSystem attribute of the event")
    @Required
    ValueProvider<String> getSourceSystem();

    void setSourceSystem(ValueProvider<String> value);

  }

  /**
   * Takes the data from the TableRow and prepares it for the PubSub, including
   * adding attributes to ensure the payload is routed correctly.
   */
  // We would rather use a SimpleFunction here but then we wouldn't be able
  // to inject our value providers. So instead we hackishly make a nested class
  public static class MapQueryToPubsub extends DoFn<TableRow, PubsubMessage> {
    private final ValueProvider<String> targetDataset;
    private final ValueProvider<String> targetTable;
    private final ValueProvider<String> sourceSystem;

    MapQueryToPubsub(
        ValueProvider<String> targetDataset, 
        ValueProvider<String> targetTable, 
        ValueProvider<String> sourceSystem) {
      this.targetDataset = targetDataset;
      this.targetTable = targetTable;
      this.sourceSystem = sourceSystem;
    }

    /**
     * Entry point of DoFn for Dataflow.
     */
    @ProcessElement
    public void processElement(ProcessContext c) {
      TableRow row = c.element();
      if (!row.containsKey("json")) {
        logger.warn("table does not contain column named 'json'");
      }
      Map<String, String> attributes = new HashMap<>();
      attributes.put("sourceSystem", sourceSystem.get());
      attributes.put("targetDataset", targetDataset.get());
      attributes.put("targetTable", targetTable.get());
      String json = (String) row.get("json");
      c.output(new PubsubMessage(json.getBytes(), attributes));
    }
  }

  /**
   * Run the pipeline. This is the entrypoint for running 'locally'
   */
  public static void main(String[] args) {
    // Parse the user options passed from the command-line
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options);
  }

  /**
   * Run the pipeline. This is the entrypoint that GCP will use
   */
  public static PipelineResult run(Options options) {

    Pipeline pipeline = Pipeline.create(options);

    pipeline.apply("Read from BigQuery query",
        BigQueryIO.readTableRows().fromQuery(options.getBigQuerySql()).usingStandardSql().withoutValidation()
            .withTemplateCompatibility())
        .apply("Map data to PubsubMessage",
            ParDo.of(
                new MapQueryToPubsub(
                    options.getBigQueryTargetDataset(),
                    options.getBigQueryTargetTable(),
                    options.getSourceSystem()
                )
            )
        )
        .apply("Write message to PubSub", PubsubIO.writeMessages().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

被查询的 BigQuery 数据本质上是事件日志。我们最近确定将这些事件插入 pubsub 主题的顺序很重要。我们可以通过在针对 BigQuery 发出的查询中使用 ORDER BY 来确定正确的顺序,但是我们怀疑在将数据插入 pubsub 主题时是否会遵守该顺序。

我们主要关注的是这段代码:

pipeline.apply("Read from BigQuery query",
        BigQueryIO.readTableRows().fromQuery(options.getBigQuerySql()).usingStandardSql().withoutValidation()
            .withTemplateCompatibility())

该简单命令在 Dataflow 中显示为:

该步骤中发生了很多事情(洗牌等...),实际上许多子步骤本身由多个子步骤组成。此外,其中一个子步骤称为“ReadFiles”,这让我觉得 Dataflow 可能正在将数据写入某种临时文件存储。总而言之,这让我怀疑当行发布到 pubsub 时,提供的 SQL 查询中的 ORDER BY 是否会被保留。

beam/Dataflow 是否提供任何保证 ORDER BY 将在这种情况下保留,或者我是否必须在我的管道中引入一种排序以保证遵守所需的顺序?

BigQueryIO Read 基本上包括一个导入 GCS 的作业作为 query/table 的 Avro,然后从这些文件(以及更多内容)中读取。所以它不会保留顺序,因为读取是并行的,并且会有多个线程读取已创建文件的块。

一般来说,像 Dataflow(或 Spark 等)这样的分布式处理系统不会保留顺序,并且鉴于其工作的并行性质,它们不擅长对事物进行排序。请记住,要对元素进行排序,您需要将所有内容保存在一个 worker 中。

事实上,即使在 BigQuery 中,ORDER BY 也是一项要求很高的任务。

很难为此找到解决方法,因为系统不是为此类任务构建的。我可以考虑添加 ROW NUMBER,将其用作时间戳并添加 window,但这是特定于用例的。

此外,PubSubIO 不会在发布时保留顺序。