BigQuery Table Pub Sub 主题在 Apache Beam Python SDK 中不起作用? Streaming Sink 的静态源
BigQuery Table a Pub Sub Topic not working in Apache Beam Python SDK? Static source to Streaming Sink
我的基本要求是创建一个从 BigQuery Table 读取的管道,然后将其转换为 JSON 并将其传递到 PubSub 主题。
起初我从 Big Query 读取并试图将其写入 Pub Sub Topic 但得到了 exception error saying "Pub Sub" is not supported for batch pipelines
。所以我尝试了一些解决方法
我在 python 中通过
解决了这个问题
- 从 BigQuery 中读取-> ConvertTo JSON 字符串-> 在云存储中另存为文本文件(Beam 管道)
p = beam.Pipeline(options=options)
json_string_output = (
p
| 'Read from BQ' >> beam.io.ReadFromBigQuery(
query='SELECT * FROM '\
'`project.dataset.table_name`',
use_standard_sql=True)
| 'convert to json' >> beam.Map(lambda record: json.dumps(record))
| 'Write results' >> beam.io.WriteToText(outputs_prefix)
)
p.run()
- 然后从那里 运行 一个普通的 python 脚本从文件中读取行并将其传递到 PubSub 主题
# create publisher
publisher = pubsub_v1.PublisherClient()
with open(input_file, 'rb') as ifp:
header = ifp.readline()
# loop over each record
for line in ifp:
event_data = line # entire line of input file is the message
print('Publishing {0} to {1}'.format(event_data, pubsub_topic))
publisher.publish(pubsub_topic, event_data)
我找不到将两个脚本集成到单个 ApacheBeam 管道中的方法。
因为您的管道没有任何无限制的 PCollections,它会自动 运行 在批处理模式下。您可以使用 --streaming
命令行标志在流模式下强制管道 运行。
我的基本要求是创建一个从 BigQuery Table 读取的管道,然后将其转换为 JSON 并将其传递到 PubSub 主题。
起初我从 Big Query 读取并试图将其写入 Pub Sub Topic 但得到了 exception error saying "Pub Sub" is not supported for batch pipelines
。所以我尝试了一些解决方法
我在 python 中通过
解决了这个问题- 从 BigQuery 中读取-> ConvertTo JSON 字符串-> 在云存储中另存为文本文件(Beam 管道)
p = beam.Pipeline(options=options)
json_string_output = (
p
| 'Read from BQ' >> beam.io.ReadFromBigQuery(
query='SELECT * FROM '\
'`project.dataset.table_name`',
use_standard_sql=True)
| 'convert to json' >> beam.Map(lambda record: json.dumps(record))
| 'Write results' >> beam.io.WriteToText(outputs_prefix)
)
p.run()
- 然后从那里 运行 一个普通的 python 脚本从文件中读取行并将其传递到 PubSub 主题
# create publisher
publisher = pubsub_v1.PublisherClient()
with open(input_file, 'rb') as ifp:
header = ifp.readline()
# loop over each record
for line in ifp:
event_data = line # entire line of input file is the message
print('Publishing {0} to {1}'.format(event_data, pubsub_topic))
publisher.publish(pubsub_topic, event_data)
我找不到将两个脚本集成到单个 ApacheBeam 管道中的方法。
因为您的管道没有任何无限制的 PCollections,它会自动 运行 在批处理模式下。您可以使用 --streaming
命令行标志在流模式下强制管道 运行。