控制 Apache Beam 数据流管道中的并行性
Control parallelism in Apache Beam Dataflow pipeline
我们正在试验 Apache Beam(使用 Go SDK)和 Dataflow 来并行化我们的一项耗时任务。对于更多上下文,我们有缓存作业,它接受一些查询,运行跨数据库并缓存它们。每个数据库查询可能需要几秒到几分钟,我们希望 运行 并行处理这些查询以便更快地完成任务。
创建了一个简单的管道,如下所示:
// Create initial PCollection.
startLoad := beam.Create(s, "InitialLoadToStartPipeline")
// Emits a unit of work along with query and date range.
cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)
// Emits a cache response which includes errCode, errMsg, time etc.
cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)
...
getCachePayloadsFn
发射的单位数量不多,生产时多为数百,最多几千。
现在的问题是 cacheQueryDoFn
没有得到并行执行,查询正在一个接一个地按顺序执行。我们通过在缓存函数中记录 goroutine id、process id、开始和结束时间等来确认 StartBundle
和 ProcessElement
中的日志,以确认执行中没有重叠。
我们希望 运行 查询始终并行,即使只有 10 个查询。根据我们的理解和文档,它从整体输入和这些捆绑包 运行 并行创建捆绑包,并在捆绑包中按顺序创建捆绑包 运行s。有没有办法控制来自负载的捆绑包数量或任何增加并行度的方法?
我们尝试过的事情:
- 保持
num_workers=2
和 autoscaling_algorithm=None
。它启动了两个 VM,但 运行s Setup
方法仅在一个 VM 上初始化 DoFn 并将其用于整个负载。
- 找到
sdk_worker_parallelism
选项 here。但不确定如何正确设置它。尝试用 beam.PipelineOptions.Set("sdk_worker_parallelism", "50")
设置它。没有效果。
默认情况下,Create 不是并行的,所有的 DoFns 都被融合到与 Create 相同的阶段,因此它们也没有并行性。有关这方面的更多信息,请参阅 https://beam.apache.org/documentation/runtime/model/#dependent-parallellism。
您可以使用 Reshuffle 转换明确强制熔断。
我们正在试验 Apache Beam(使用 Go SDK)和 Dataflow 来并行化我们的一项耗时任务。对于更多上下文,我们有缓存作业,它接受一些查询,运行跨数据库并缓存它们。每个数据库查询可能需要几秒到几分钟,我们希望 运行 并行处理这些查询以便更快地完成任务。
创建了一个简单的管道,如下所示:
// Create initial PCollection.
startLoad := beam.Create(s, "InitialLoadToStartPipeline")
// Emits a unit of work along with query and date range.
cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)
// Emits a cache response which includes errCode, errMsg, time etc.
cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)
...
getCachePayloadsFn
发射的单位数量不多,生产时多为数百,最多几千。
现在的问题是 cacheQueryDoFn
没有得到并行执行,查询正在一个接一个地按顺序执行。我们通过在缓存函数中记录 goroutine id、process id、开始和结束时间等来确认 StartBundle
和 ProcessElement
中的日志,以确认执行中没有重叠。
我们希望 运行 查询始终并行,即使只有 10 个查询。根据我们的理解和文档,它从整体输入和这些捆绑包 运行 并行创建捆绑包,并在捆绑包中按顺序创建捆绑包 运行s。有没有办法控制来自负载的捆绑包数量或任何增加并行度的方法?
我们尝试过的事情:
- 保持
num_workers=2
和autoscaling_algorithm=None
。它启动了两个 VM,但 运行sSetup
方法仅在一个 VM 上初始化 DoFn 并将其用于整个负载。 - 找到
sdk_worker_parallelism
选项 here。但不确定如何正确设置它。尝试用beam.PipelineOptions.Set("sdk_worker_parallelism", "50")
设置它。没有效果。
默认情况下,Create 不是并行的,所有的 DoFns 都被融合到与 Create 相同的阶段,因此它们也没有并行性。有关这方面的更多信息,请参阅 https://beam.apache.org/documentation/runtime/model/#dependent-parallellism。
您可以使用 Reshuffle 转换明确强制熔断。