如何使用 Google Pub/Sub 和 Google Dataflow/Beam 使用 Python?
How to use Google Pub/Sub with Google Dataflow/Beam using Python?
我是 Pub/Sub 和 Dataflow/Beam 的新手。我在 Spark 和 Kafka 中完成了一项任务,我想使用 Pub/Sub 和 Dataflow/Beam 来做同样的事情。到目前为止,据我了解,Kafka 类似于 Pub/Sub,而 Spark 类似于 Dataflow/Beam。
任务是获取 JSON 文件并写入 Pub/Sub 主题。然后使用 Beam/Dataflow 我需要将该数据放入 PCollection 中。我将如何实现这一目标?
Pubsub 是一个流式源/接收器(只对它 read/write 一次没有意义)。 Dataflow python SDK 对流的支持尚不可用。
文档:https://cloud.google.com/dataflow/release-notes/release-notes-python。
流式传输可用后,您应该可以轻松完成此操作。
但是,如果您是从文件 -> pubsub 然后是 pubsub -> pcollection,您应该能够使用批处理管道执行此操作并删除 pubsub 方面。可以看看beam的基本文件io。
我解决了上面的问题。我能够持续从 pubsub 主题读取数据,然后进行一些处理,然后将结果写入数据存储。
with beam.Pipeline(options=options) as p:
# Read from PubSub into a PCollection.
lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)
# Group and aggregate each JSON object.
transformed = (lines
| 'Split' >> beam.FlatMap(lambda x: x.split("\n"))
| 'jsonParse' >> beam.ParDo(jsonParse())
| beam.WindowInto(window.FixedWindows(15,0))
| 'Combine' >> beam.CombinePerKey(sum))
# Create Entity.
transformed = transformed | 'create entity' >> beam.Map(
EntityWrapper(config.NAMESPACE, config.KIND, config.ANCESTOR).make_entity)
# Write to Datastore.
transformed | 'write to datastore' >> WriteToDatastore(known_args.dataset_id)
我是 Pub/Sub 和 Dataflow/Beam 的新手。我在 Spark 和 Kafka 中完成了一项任务,我想使用 Pub/Sub 和 Dataflow/Beam 来做同样的事情。到目前为止,据我了解,Kafka 类似于 Pub/Sub,而 Spark 类似于 Dataflow/Beam。
任务是获取 JSON 文件并写入 Pub/Sub 主题。然后使用 Beam/Dataflow 我需要将该数据放入 PCollection 中。我将如何实现这一目标?
Pubsub 是一个流式源/接收器(只对它 read/write 一次没有意义)。 Dataflow python SDK 对流的支持尚不可用。
文档:https://cloud.google.com/dataflow/release-notes/release-notes-python。
流式传输可用后,您应该可以轻松完成此操作。
但是,如果您是从文件 -> pubsub 然后是 pubsub -> pcollection,您应该能够使用批处理管道执行此操作并删除 pubsub 方面。可以看看beam的基本文件io。
我解决了上面的问题。我能够持续从 pubsub 主题读取数据,然后进行一些处理,然后将结果写入数据存储。
with beam.Pipeline(options=options) as p:
# Read from PubSub into a PCollection.
lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)
# Group and aggregate each JSON object.
transformed = (lines
| 'Split' >> beam.FlatMap(lambda x: x.split("\n"))
| 'jsonParse' >> beam.ParDo(jsonParse())
| beam.WindowInto(window.FixedWindows(15,0))
| 'Combine' >> beam.CombinePerKey(sum))
# Create Entity.
transformed = transformed | 'create entity' >> beam.Map(
EntityWrapper(config.NAMESPACE, config.KIND, config.ANCESTOR).make_entity)
# Write to Datastore.
transformed | 'write to datastore' >> WriteToDatastore(known_args.dataset_id)