在 BigQuery 上插入行:InsertAllRequest 与 BigQueryIO.writeTableRows()
Inserting rows on BigQuery: InsertAllRequest Vs BigQueryIO.writeTableRows()
当我使用 writeTableRows
在 BigQuery 上插入行时,与 InsertAllRequest
相比,性能确实很差。显然,设置不正确。
用例 1: 我写了一个 Java 程序来使用 Twitter4j 处理 'sample' Twitter 流。当收到推文时,我使用以下方法将其写入 BigQuery:
insertAllRequestBuilder.addRow(rowContent);
当我 运行 我的 Mac 这个程序时,它每分钟直接向 BigQuery table 插入大约 1000 行。我认为我可以通过 运行在集群上运行数据流作业来做得更好。
用例 2: 当收到推文时,我将其写入 Google 的 PubSub 主题。我 运行 来自我的 Mac,它每分钟发送大约 1000 条消息。
我写了一个 Dataflow 作业来读取这个主题并使用 BigQueryIO.writeTableRows()
写入 BigQuery。我有一个 8 机 Dataproc 集群。我使用 DataflowRunner 在该集群的主节点上启动了这项工作。速度慢得令人难以置信!大约每 5 分钟 100 行。这是相关代码的片段:
statuses.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
Status status = c.element();
row.set("Id", status.getId());
row.set("Text", status.getText());
row.set("RetweetCount", status.getRetweetCount());
row.set("FavoriteCount", status.getFavoriteCount());
row.set("Language", status.getLang());
row.set("ReceivedAt", null);
row.set("UserId", status.getUser().getId());
row.set("CountryCode", status.getPlace().getCountryCode());
row.set("Country", status.getPlace().getCountry());
c.output(row);
}
}))
.apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)//
.withSchema(schema)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
.withNumFileShards(1000)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
我做错了什么?我应该使用 'SparkRunner' 吗?我如何确认它 运行ning 在我集群的所有节点上?
使用 BigQuery,您可以:
- 流式传输数据。低延迟,每秒高达 10 万行,需要成本。
- 批量输入数据。延迟更高,吞吐量惊人,完全免费。
这就是您遇到的不同之处。如果您只想摄取 1000 行,批处理速度会明显变慢。与 100 亿行相同,通过批处理会更快,而且没有成本。
Dataflow/Bem 的 BigQueryIO.writeTableRows
可以流式或批处理数据。
使用BigQueryIO.Write.Method.FILE_LOADS
粘贴的代码是选择批处理。
当我使用 writeTableRows
在 BigQuery 上插入行时,与 InsertAllRequest
相比,性能确实很差。显然,设置不正确。
用例 1: 我写了一个 Java 程序来使用 Twitter4j 处理 'sample' Twitter 流。当收到推文时,我使用以下方法将其写入 BigQuery:
insertAllRequestBuilder.addRow(rowContent);
当我 运行 我的 Mac 这个程序时,它每分钟直接向 BigQuery table 插入大约 1000 行。我认为我可以通过 运行在集群上运行数据流作业来做得更好。
用例 2: 当收到推文时,我将其写入 Google 的 PubSub 主题。我 运行 来自我的 Mac,它每分钟发送大约 1000 条消息。
我写了一个 Dataflow 作业来读取这个主题并使用 BigQueryIO.writeTableRows()
写入 BigQuery。我有一个 8 机 Dataproc 集群。我使用 DataflowRunner 在该集群的主节点上启动了这项工作。速度慢得令人难以置信!大约每 5 分钟 100 行。这是相关代码的片段:
statuses.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
Status status = c.element();
row.set("Id", status.getId());
row.set("Text", status.getText());
row.set("RetweetCount", status.getRetweetCount());
row.set("FavoriteCount", status.getFavoriteCount());
row.set("Language", status.getLang());
row.set("ReceivedAt", null);
row.set("UserId", status.getUser().getId());
row.set("CountryCode", status.getPlace().getCountryCode());
row.set("Country", status.getPlace().getCountry());
c.output(row);
}
}))
.apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)//
.withSchema(schema)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
.withNumFileShards(1000)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
我做错了什么?我应该使用 'SparkRunner' 吗?我如何确认它 运行ning 在我集群的所有节点上?
使用 BigQuery,您可以:
- 流式传输数据。低延迟,每秒高达 10 万行,需要成本。
- 批量输入数据。延迟更高,吞吐量惊人,完全免费。
这就是您遇到的不同之处。如果您只想摄取 1000 行,批处理速度会明显变慢。与 100 亿行相同,通过批处理会更快,而且没有成本。
Dataflow/Bem 的 BigQueryIO.writeTableRows
可以流式或批处理数据。
使用BigQueryIO.Write.Method.FILE_LOADS
粘贴的代码是选择批处理。