Pubsub 挂钩到 Bigquery
Pubsub Hook to Bigquery
我有一个数据流水线:应用引擎发布到 pubsub 推送到 BigQuery。在文档 https://cloud.google.com/python/getting-started/using-pub-sub 中的示例中,他们将 pubsub 推送到 AppEngine 中托管的工作人员,然后由该工作人员处理数据(在我的例子中写入适当的 BigQuery table)。但是,是否可以通过订阅 BigQuery 直接让 pubsub 推送 table?
目前,没有自动将数据推送到 BigQuery 的方法。两个选项是:
编写一个订阅者,从 Google Cloud Pub/Sub 订阅中获取消息并将它们写入 BigQuery。
使用Google Cloud Dataflow to read via a Pub/Sub I/O and write via a BigQuery I/O.
我正在使用 Dataflow 和 Apache Beam 将 PubSub 消息处理到 BigQuery 表中。
import apache_beam as beam
import apache_beam.io
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions, StandardOptions
import json
TOPIC = 'projects/your-project-id/topics/your-topic'
SUBSCRIPTION = 'projects/your-project-id/subscriptions/your-subscription'
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'your-project-id'
google_cloud_options.job_name = 'your-beam-job'
google_cloud_options.staging_location = 'gs://your-bucket/staging'
google_cloud_options.temp_location = 'gs://your-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(StandardOptions).streaming = True
class FormatDoFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
print({'data': json.dumps(element.attributes['data'])})
return [{'data': json.dumps(element.attributes['data']), 'schema':element.attributes['schema']}]
with beam.Pipeline(options=options) as gcp:
messages = (gcp | beam.io.ReadFromPubSub(topic=None, subscription=SUBSCRIPTION, with_attributes=True))
#do some schema validation here and output errors
def printattr(element):
print(element)
lines = messages | beam.ParDo((FormatDoFn()))
lines | 'Write' >> beam.io.WriteToBigQuery(
'wf-us-virtualmedia-sandbox:jstafford_dataset.jstafford_table',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
result = gcp.run()
我有一个数据流水线:应用引擎发布到 pubsub 推送到 BigQuery。在文档 https://cloud.google.com/python/getting-started/using-pub-sub 中的示例中,他们将 pubsub 推送到 AppEngine 中托管的工作人员,然后由该工作人员处理数据(在我的例子中写入适当的 BigQuery table)。但是,是否可以通过订阅 BigQuery 直接让 pubsub 推送 table?
目前,没有自动将数据推送到 BigQuery 的方法。两个选项是:
编写一个订阅者,从 Google Cloud Pub/Sub 订阅中获取消息并将它们写入 BigQuery。
使用Google Cloud Dataflow to read via a Pub/Sub I/O and write via a BigQuery I/O.
我正在使用 Dataflow 和 Apache Beam 将 PubSub 消息处理到 BigQuery 表中。
import apache_beam as beam
import apache_beam.io
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions, StandardOptions
import json
TOPIC = 'projects/your-project-id/topics/your-topic'
SUBSCRIPTION = 'projects/your-project-id/subscriptions/your-subscription'
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'your-project-id'
google_cloud_options.job_name = 'your-beam-job'
google_cloud_options.staging_location = 'gs://your-bucket/staging'
google_cloud_options.temp_location = 'gs://your-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(StandardOptions).streaming = True
class FormatDoFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
print({'data': json.dumps(element.attributes['data'])})
return [{'data': json.dumps(element.attributes['data']), 'schema':element.attributes['schema']}]
with beam.Pipeline(options=options) as gcp:
messages = (gcp | beam.io.ReadFromPubSub(topic=None, subscription=SUBSCRIPTION, with_attributes=True))
#do some schema validation here and output errors
def printattr(element):
print(element)
lines = messages | beam.ParDo((FormatDoFn()))
lines | 'Write' >> beam.io.WriteToBigQuery(
'wf-us-virtualmedia-sandbox:jstafford_dataset.jstafford_table',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
result = gcp.run()