使用 BatchElements 进行批处理在 DirectRunner 和 DataflowRunner 中的工作方式不同 (GCP/Dataflow)

Batching with BatchElements works differently in DirectRunner and DataflowRunner (GCP/Dataflow)

我正在使用 apache beam(GCP 数据流)和 python 构建管道,我的管道如下所示:

...
with beam.Pipeline(options=self.pipeline_options) as pipeline:
            somepipeline = (
                pipeline
                | "ReadPubSubMessage" >> ReadFromPubSub(
                    subscription=self.custom_options.some_subscription)
                | "Windowing" >> beam.WindowInto(beam.window.FixedWindows(30))
                | "DecodePubSubMessage" >> beam.ParDo(DecodePubSubMessage()).with_outputs(ERROR_OUTPUT_NAME, main=MAIN_OUTPUT_NAME)
                | "Geting and sorting listings" >> beam.ParDo(SortByCompletion())
                | "Batching listings" >> beam.BatchElements(min_batch_size=3,max_batch_size=3) 
                | "Print logs" >> beam.Map(logging.info)
            )
...

当我通过 DirectRunner 运行 管道时,一切都按预期工作(您可以看到 1 个批次,其中包含 3 个元素):

但是当我 运行 使用 DataflowRunner 的相同代码时,我得到了这个结果(您可以看到 3 个批次,每个批次中有 1 个元素):

即使我 运行 并行使用此管道(在两个终端 windows 中),也会发生这种情况。两者都 运行 带有流媒体标志。消息立即通过 python 脚本一条一条发送到 pubsub。

问题: 是什么导致 DataflowRunner 出现此问题(我的假设是数据流中的工作人员数量,但当我检查它时,这项工作中只有 1 名工作人员)以及如何获得与通过 DirrectRunner 相同的结果。

谢谢!

BatchElements 是不确定的,不会跨包进行批处理。直接运行器非常简单,将整个 PCollection 放入一个包中,但 Dataflow 被编写为分布式运行器,即使只有一个工作人员,也可能同时存在多个包 运行(例如在不同线程上)并且捆绑往往相当小。

您可以考虑使用 Beam 的 GroupIntoBatches,它在流式传输模式下效果更好(尽管这需要选择一个用于批处理的密钥)。