在 BigQuery 上插入行:InsertAllRequest 与 BigQueryIO.writeTableRows()

Inserting rows on BigQuery: InsertAllRequest Vs BigQueryIO.writeTableRows()

当我使用 writeTableRows 在 BigQuery 上插入行时,与 InsertAllRequest 相比,性能确实很差。显然,设置不正确。

用例 1: 我写了一个 Java 程序来使用 Twitter4j 处理 'sample' Twitter 流。当收到推文时,我使用以下方法将其写入 BigQuery:

insertAllRequestBuilder.addRow(rowContent);

当我 运行 我的 Mac 这个程序时,它每分钟直接向 BigQuery table 插入大约 1000 行。我认为我可以通过 运行在集群上运行数据流作业来做得更好。

用例 2: 当收到推文时,我将其写入 Google 的 PubSub 主题。我 运行 来自我的 Mac,它每分钟发送大约 1000 条消息。

我写了一个 Dataflow 作业来读取这个主题并使用 BigQueryIO.writeTableRows() 写入 BigQuery。我有一个 8 机 Dataproc 集群。我使用 DataflowRunner 在该集群的主节点上启动了这项工作。速度慢得令人难以置信!大约每 5 分钟 100 行。这是相关代码的片段:

statuses.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
        TableRow row = new TableRow();
        Status status = c.element();
        row.set("Id", status.getId());
        row.set("Text", status.getText());
        row.set("RetweetCount", status.getRetweetCount());
        row.set("FavoriteCount", status.getFavoriteCount());
        row.set("Language", status.getLang());
        row.set("ReceivedAt", null);
        row.set("UserId", status.getUser().getId());
        row.set("CountryCode", status.getPlace().getCountryCode());
        row.set("Country", status.getPlace().getCountry());
        c.output(row);
    }
})) 
    .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)//
            .withSchema(schema)
            .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
            .withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
            .withNumFileShards(1000)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

我做错了什么?我应该使用 'SparkRunner' 吗?我如何确认它 运行ning 在我集群的所有节点上?

使用 BigQuery,您可以:

  • 流式传输数据。低延迟,每秒高达 10 万行,需要成本。
  • 批量输入数据。延迟更高,吞吐量惊人,完全免费。

这就是您遇到的不同之处。如果您只想摄取 1000 行,批处理速度会明显变慢。与 100 亿行相同,通过批处理会更快,而且没有成本。

Dataflow/Bem 的 BigQueryIO.writeTableRows 可以流式或批处理数据。

使用BigQueryIO.Write.Method.FILE_LOADS粘贴的代码是选择批处理。