控制 Apache Beam 数据流管道中的并行性

Control parallelism in Apache Beam Dataflow pipeline

我们正在试验 Apache Beam(使用 Go SDK)和 Dataflow 来并行化我们的一项耗时任务。对于更多上下文,我们有缓存作业,它接受一些查询,运行跨数据库并缓存它们。每个数据库查询可能需要几秒到几分钟,我们希望 运行 并行处理这些查询以便更快地完成任务。

创建了一个简单的管道,如下所示:

    // Create initial PCollection.
    startLoad := beam.Create(s, "InitialLoadToStartPipeline")

    // Emits a unit of work along with query and date range.
    cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)

    // Emits a cache response which includes errCode, errMsg, time etc.
    cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)

    ...

getCachePayloadsFn发射的单位数量不多,生产时多为数百,最多几千。

现在的问题是 cacheQueryDoFn 没有得到并行执行,查询正在一个接一个地按顺序执行。我们通过在缓存函数中记录 goroutine id、process id、开始和结束时间等来确认 StartBundleProcessElement 中的日志,以确认执行中没有重叠。

我们希望 运行 查询始终并行,即使只有 10 个查询。根据我们的理解和文档,它从整体输入和这些捆绑包 运行 并行创建捆绑包,并在捆绑包中按顺序创建捆绑包 运行s。有没有办法控制来自负载的捆绑包数量或任何增加并行度的方法?

我们尝试过的事情:

  1. 保持 num_workers=2autoscaling_algorithm=None。它启动了两个 VM,但 运行s Setup 方法仅在一个 VM 上初始化 DoFn 并将其用于整个负载。
  2. 找到 sdk_worker_parallelism 选项 here。但不确定如何正确设置它。尝试用 beam.PipelineOptions.Set("sdk_worker_parallelism", "50") 设置它。没有效果。

默认情况下,Create 不是并行的,所有的 DoFns 都被融合到与 Create 相同的阶段,因此它们也没有并行性。有关这方面的更多信息,请参阅 https://beam.apache.org/documentation/runtime/model/#dependent-parallellism

您可以使用 Reshuffle 转换明确强制熔断。