python 中带有 BigQuery 接收器的流式传输管道
Streaming pipelines with BigQuery sinks in python
我正在构建一个 apache 束流管道,其源是 Pubsub,接收器是 BigQuery。我收到了错误消息:
"Workflow failed. Causes: Unknown message code."
虽然这条消息很神秘,但我现在认为不支持将 BigQuery 作为流式管道的接收器,它在这里这样说:
Streaming from Pub/Sub to BigQuery
我确定这是导致问题的原因吗?或者,如果不支持,它仍然不受支持吗?
谁能告诉我这个功能什么时候发布?很遗憾,我很高兴能使用它。
Python 从 Beam 2.5.0 开始,流媒体管道在实验上可用,如 beam 文档中所述 here
因此您需要安装 apache-beam 2.5.0 和 apache-beam[gcp]
pip install apache-beam==2.5.0
pip install apache-beam[gcp]
我运行这个命令:
python pubsub_to_bq.py --runner DataflowRunner --input_topic=projects/pubsub-public-data/topics/taxirides-realtime --project <my-project> --temp_location gs://<my-bucket>/tmp --staging_location gs://<my-bucket>/staging --streaming
使用下面的代码,它工作正常:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
def parse_pubsub(line):
import json
record = json.loads(line)
return (record['ride_id']), (record['point_idx']), (record['latitude']), (record['longitude']), (record['timestamp']), (record['meter_increment']), (record['ride_status']), (record['meter_reading']), (record['passenger_count'])
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', dest='input_topic', required=True,
help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read from PubSub
lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
#Adapt messages from PubSub to BQ table
lines = lines | beam.Map(parse_pubsub)
lines = lines | beam.Map(lambda (ride_id, point_idx, latitude, longitude, timestamp, meter_increment, ride_status,meter_reading, passenger_count): {'ride_id':ride_id, 'point_idx':point_idx, 'latitude':latitude, 'longitude':longitude, 'timestamp':timestamp, 'meter_increment':meter_increment,'ride_status': ride_status,'meter_reading':meter_reading,'passenger_count': passenger_count})
#Write to a BQ table
lines | beam.io.WriteToBigQuery(table ='<my-table>',dataset='<my-dataset>',project='<my-project>' )
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
此代码使用 publicly available topic "--topic
projects/pubsub-public-data/topics/taxirides-realtime" 和我使用正确架构创建的 BQ table。
如果您使用此示例,请注意不要离开它 运行 否则您会产生费用,因为您会收到来自此 PubSub 主题的大量消息。
我正在构建一个 apache 束流管道,其源是 Pubsub,接收器是 BigQuery。我收到了错误消息:
"Workflow failed. Causes: Unknown message code."
虽然这条消息很神秘,但我现在认为不支持将 BigQuery 作为流式管道的接收器,它在这里这样说: Streaming from Pub/Sub to BigQuery
我确定这是导致问题的原因吗?或者,如果不支持,它仍然不受支持吗?
谁能告诉我这个功能什么时候发布?很遗憾,我很高兴能使用它。
Python 从 Beam 2.5.0 开始,流媒体管道在实验上可用,如 beam 文档中所述 here
因此您需要安装 apache-beam 2.5.0 和 apache-beam[gcp]
pip install apache-beam==2.5.0
pip install apache-beam[gcp]
我运行这个命令:
python pubsub_to_bq.py --runner DataflowRunner --input_topic=projects/pubsub-public-data/topics/taxirides-realtime --project <my-project> --temp_location gs://<my-bucket>/tmp --staging_location gs://<my-bucket>/staging --streaming
使用下面的代码,它工作正常:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
def parse_pubsub(line):
import json
record = json.loads(line)
return (record['ride_id']), (record['point_idx']), (record['latitude']), (record['longitude']), (record['timestamp']), (record['meter_increment']), (record['ride_status']), (record['meter_reading']), (record['passenger_count'])
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', dest='input_topic', required=True,
help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read from PubSub
lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
#Adapt messages from PubSub to BQ table
lines = lines | beam.Map(parse_pubsub)
lines = lines | beam.Map(lambda (ride_id, point_idx, latitude, longitude, timestamp, meter_increment, ride_status,meter_reading, passenger_count): {'ride_id':ride_id, 'point_idx':point_idx, 'latitude':latitude, 'longitude':longitude, 'timestamp':timestamp, 'meter_increment':meter_increment,'ride_status': ride_status,'meter_reading':meter_reading,'passenger_count': passenger_count})
#Write to a BQ table
lines | beam.io.WriteToBigQuery(table ='<my-table>',dataset='<my-dataset>',project='<my-project>' )
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
此代码使用 publicly available topic "--topic projects/pubsub-public-data/topics/taxirides-realtime" 和我使用正确架构创建的 BQ table。
如果您使用此示例,请注意不要离开它 运行 否则您会产生费用,因为您会收到来自此 PubSub 主题的大量消息。