什么可以限制数据流管道只使用一个工人?

What could restrict dataflow pipeline to only using a single worker?

我一直在尝试调试一个管道,该管道采用驱动后续 ParDo 操作的输入参数。由于我无法理解的原因,即使我禁用了自动缩放并设置了工作人员数量,管道也不会扩展到单个工作人员之外。可悲的是,GCP 上糟糕的数据流接口几乎没有说明无法扩展的问题。任何人都可以建议可能是什么问题或如何有效调试吗?

with beam.Pipeline(options=opts) as p:
  result = (
      p | "Initialize Pipeline" >> beam.Create(
          [(f'gs://data/']) |
      "Scan for extraction tasks" >> beam.ParDo(scanner.ScanForTasks()) |
      "Extract data" >> beam.ParDo(worker.TaskToData()))

这个问题原来与名为“fusion”的数据流优化有关,其中相邻的操作融合在一起,大概是为了让它们可以 运行 在同一个工作器上无缝连接。问题在于,如果管道由生成大量下游任务的单个项目播种,则所有这些任务将在处理初始播种任务的同一个工作人员上处理。

解决方案是直接为管道播种任务,以防止 'optimization' 降低性能

def scan_for_tasks():
  tasks = []
  # Build your task list here
  return tasks

with beam.Pipeline(options=opts) as p:
  result = (
    p | "Initialize Pipeline" >> beam.Create(scan_for_tasks()) |
    "Extract data" >> beam.ParDo(worker.TaskToData()))