使用 BatchElements 进行批处理在 DirectRunner 和 DataflowRunner 中的工作方式不同 (GCP/Dataflow)
Batching with BatchElements works differently in DirectRunner and DataflowRunner (GCP/Dataflow)
我正在使用 apache beam(GCP 数据流)和 python 构建管道,我的管道如下所示:
...
with beam.Pipeline(options=self.pipeline_options) as pipeline:
somepipeline = (
pipeline
| "ReadPubSubMessage" >> ReadFromPubSub(
subscription=self.custom_options.some_subscription)
| "Windowing" >> beam.WindowInto(beam.window.FixedWindows(30))
| "DecodePubSubMessage" >> beam.ParDo(DecodePubSubMessage()).with_outputs(ERROR_OUTPUT_NAME, main=MAIN_OUTPUT_NAME)
| "Geting and sorting listings" >> beam.ParDo(SortByCompletion())
| "Batching listings" >> beam.BatchElements(min_batch_size=3,max_batch_size=3)
| "Print logs" >> beam.Map(logging.info)
)
...
当我通过 DirectRunner 运行 管道时,一切都按预期工作(您可以看到 1 个批次,其中包含 3 个元素):
但是当我 运行 使用 DataflowRunner 的相同代码时,我得到了这个结果(您可以看到 3 个批次,每个批次中有 1 个元素):
即使我 运行 并行使用此管道(在两个终端 windows 中),也会发生这种情况。两者都 运行 带有流媒体标志。消息立即通过 python 脚本一条一条发送到 pubsub。
问题:
是什么导致 DataflowRunner 出现此问题(我的假设是数据流中的工作人员数量,但当我检查它时,这项工作中只有 1 名工作人员)以及如何获得与通过 DirrectRunner 相同的结果。
谢谢!
BatchElements
是不确定的,不会跨包进行批处理。直接运行器非常简单,将整个 PCollection 放入一个包中,但 Dataflow 被编写为分布式运行器,即使只有一个工作人员,也可能同时存在多个包 运行(例如在不同线程上)并且捆绑往往相当小。
您可以考虑使用 Beam 的 GroupIntoBatches,它在流式传输模式下效果更好(尽管这需要选择一个用于批处理的密钥)。
我正在使用 apache beam(GCP 数据流)和 python 构建管道,我的管道如下所示:
...
with beam.Pipeline(options=self.pipeline_options) as pipeline:
somepipeline = (
pipeline
| "ReadPubSubMessage" >> ReadFromPubSub(
subscription=self.custom_options.some_subscription)
| "Windowing" >> beam.WindowInto(beam.window.FixedWindows(30))
| "DecodePubSubMessage" >> beam.ParDo(DecodePubSubMessage()).with_outputs(ERROR_OUTPUT_NAME, main=MAIN_OUTPUT_NAME)
| "Geting and sorting listings" >> beam.ParDo(SortByCompletion())
| "Batching listings" >> beam.BatchElements(min_batch_size=3,max_batch_size=3)
| "Print logs" >> beam.Map(logging.info)
)
...
当我通过 DirectRunner 运行 管道时,一切都按预期工作(您可以看到 1 个批次,其中包含 3 个元素):
但是当我 运行 使用 DataflowRunner 的相同代码时,我得到了这个结果(您可以看到 3 个批次,每个批次中有 1 个元素):
即使我 运行 并行使用此管道(在两个终端 windows 中),也会发生这种情况。两者都 运行 带有流媒体标志。消息立即通过 python 脚本一条一条发送到 pubsub。
问题: 是什么导致 DataflowRunner 出现此问题(我的假设是数据流中的工作人员数量,但当我检查它时,这项工作中只有 1 名工作人员)以及如何获得与通过 DirrectRunner 相同的结果。
谢谢!
BatchElements
是不确定的,不会跨包进行批处理。直接运行器非常简单,将整个 PCollection 放入一个包中,但 Dataflow 被编写为分布式运行器,即使只有一个工作人员,也可能同时存在多个包 运行(例如在不同线程上)并且捆绑往往相当小。
您可以考虑使用 Beam 的 GroupIntoBatches,它在流式传输模式下效果更好(尽管这需要选择一个用于批处理的密钥)。