将 BigQuery table 流式传输到 Google Pub/Sub

Stream BigQuery table into Google Pub/Sub

我有一个 Google bigQuery Table,我想将整个 table 流式传输到 pub-sub 主题

easy/fast 应该怎样做?

提前谢谢你,

这实际上取决于 table 的大小。

如果它很小 table(几千条记录,几个 doze 列),那么您可以设置一个进程来查询整个 table,将响应转换为 JSON 数组,并推送到 pub-sub.

如果它很大 table(millions/billions 条记录,数百列),您必须导出到文件,然后 prepare/ship 到 pub-sub

这也取决于您的分区策略 - 如果您的 table 设置为按日期分区,您也许可以再次查询而不是导出。

最后但同样重要的是,它还取决于频率 - 这是一次性交易(然后导出)还是连续过程(然后使用 table 装饰器仅查询最新数据)?

如果您想要一个真正有用的答案,则需要更多信息。

编辑

根据您对 table 大小的评论,我认为最好的方法是编写一个脚本:

  1. 将 table 导出为 GCS 作为换行符分隔 JSON

  2. 处理文件(逐行读取)并发送到 pub-sub

大多数编程语言都有 client libraries。我用 Python 做过类似的事情,而且相当简单。

2019更新:

现在 Pub/Sub 中的 click-to-bigquery 选项真的很容易:

找到它:https://console.cloud.google.com/cloudpubsub/topicList


据我所知,最简单的方法是通过 Google Cloud Dataflow,它本身知道如何访问 BigQuery 和 Pub/Sub。

理论上它应该像以下 Python 行一样简单:

p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
  beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
  beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))

这种 Python/Dataflow/BigQuery/PubSub 的组合今天不起作用(Python Dataflow 处于测试阶段,但 keep an eye on the changelog)。

我们可以对 Java 做同样的事情,而且效果很好——我刚刚测试过。它既可以在本地运行,也可以在托管的数据流运行器中运行:

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<TableRow> weatherData = p.apply(
        BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
    @Override
    public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
        c.output(c.element().toString());
    }
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));

p.run();

测试消息是否存在:

gcloud --project myproject beta pubsub subscriptions  pull --auto-ack sub1

托管数据流屏幕截图: