将 BigQuery table 流式传输到 Google Pub/Sub
Stream BigQuery table into Google Pub/Sub
-
google-cloud-storage
-
google-bigquery
-
google-cloud-platform
-
google-cloud-pubsub
-
google-cloud-dataflow
我有一个 Google bigQuery Table,我想将整个 table 流式传输到 pub-sub 主题
easy/fast 应该怎样做?
提前谢谢你,
这实际上取决于 table 的大小。
如果它很小 table(几千条记录,几个 doze 列),那么您可以设置一个进程来查询整个 table,将响应转换为 JSON 数组,并推送到 pub-sub.
如果它很大 table(millions/billions 条记录,数百列),您必须导出到文件,然后 prepare/ship 到 pub-sub
这也取决于您的分区策略 - 如果您的 table 设置为按日期分区,您也许可以再次查询而不是导出。
最后但同样重要的是,它还取决于频率 - 这是一次性交易(然后导出)还是连续过程(然后使用 table 装饰器仅查询最新数据)?
如果您想要一个真正有用的答案,则需要更多信息。
编辑
根据您对 table 大小的评论,我认为最好的方法是编写一个脚本:
将 table 导出为 GCS 作为换行符分隔 JSON
处理文件(逐行读取)并发送到 pub-sub
大多数编程语言都有 client libraries。我用 Python 做过类似的事情,而且相当简单。
2019更新:
现在 Pub/Sub 中的 click-to-bigquery 选项真的很容易:
找到它:https://console.cloud.google.com/cloudpubsub/topicList
据我所知,最简单的方法是通过 Google Cloud Dataflow,它本身知道如何访问 BigQuery 和 Pub/Sub。
理论上它应该像以下 Python 行一样简单:
p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))
这种 Python/Dataflow/BigQuery/PubSub 的组合今天不起作用(Python Dataflow 处于测试阶段,但 keep an eye on the changelog)。
我们可以对 Java 做同样的事情,而且效果很好——我刚刚测试过。它既可以在本地运行,也可以在托管的数据流运行器中运行:
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
c.output(c.element().toString());
}
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));
p.run();
测试消息是否存在:
gcloud --project myproject beta pubsub subscriptions pull --auto-ack sub1
托管数据流屏幕截图:
google-cloud-storage
google-bigquery
google-cloud-platform
google-cloud-pubsub
google-cloud-dataflow
我有一个 Google bigQuery Table,我想将整个 table 流式传输到 pub-sub 主题
easy/fast 应该怎样做?
提前谢谢你,
这实际上取决于 table 的大小。
如果它很小 table(几千条记录,几个 doze 列),那么您可以设置一个进程来查询整个 table,将响应转换为 JSON 数组,并推送到 pub-sub.
如果它很大 table(millions/billions 条记录,数百列),您必须导出到文件,然后 prepare/ship 到 pub-sub
这也取决于您的分区策略 - 如果您的 table 设置为按日期分区,您也许可以再次查询而不是导出。
最后但同样重要的是,它还取决于频率 - 这是一次性交易(然后导出)还是连续过程(然后使用 table 装饰器仅查询最新数据)?
如果您想要一个真正有用的答案,则需要更多信息。
编辑
根据您对 table 大小的评论,我认为最好的方法是编写一个脚本:
将 table 导出为 GCS 作为换行符分隔 JSON
处理文件(逐行读取)并发送到 pub-sub
大多数编程语言都有 client libraries。我用 Python 做过类似的事情,而且相当简单。
2019更新:
现在 Pub/Sub 中的 click-to-bigquery 选项真的很容易:
找到它:https://console.cloud.google.com/cloudpubsub/topicList
据我所知,最简单的方法是通过 Google Cloud Dataflow,它本身知道如何访问 BigQuery 和 Pub/Sub。
理论上它应该像以下 Python 行一样简单:
p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))
这种 Python/Dataflow/BigQuery/PubSub 的组合今天不起作用(Python Dataflow 处于测试阶段,但 keep an eye on the changelog)。
我们可以对 Java 做同样的事情,而且效果很好——我刚刚测试过。它既可以在本地运行,也可以在托管的数据流运行器中运行:
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
c.output(c.element().toString());
}
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));
p.run();
测试消息是否存在:
gcloud --project myproject beta pubsub subscriptions pull --auto-ack sub1
托管数据流屏幕截图: