什么可以限制数据流管道只使用一个工人?
What could restrict dataflow pipeline to only using a single worker?
我一直在尝试调试一个管道,该管道采用驱动后续 ParDo 操作的输入参数。由于我无法理解的原因,即使我禁用了自动缩放并设置了工作人员数量,管道也不会扩展到单个工作人员之外。可悲的是,GCP 上糟糕的数据流接口几乎没有说明无法扩展的问题。任何人都可以建议可能是什么问题或如何有效调试吗?
with beam.Pipeline(options=opts) as p:
result = (
p | "Initialize Pipeline" >> beam.Create(
[(f'gs://data/']) |
"Scan for extraction tasks" >> beam.ParDo(scanner.ScanForTasks()) |
"Extract data" >> beam.ParDo(worker.TaskToData()))
这个问题原来与名为“fusion”的数据流优化有关,其中相邻的操作融合在一起,大概是为了让它们可以 运行 在同一个工作器上无缝连接。问题在于,如果管道由生成大量下游任务的单个项目播种,则所有这些任务将在处理初始播种任务的同一个工作人员上处理。
解决方案是直接为管道播种任务,以防止 'optimization' 降低性能
def scan_for_tasks():
tasks = []
# Build your task list here
return tasks
with beam.Pipeline(options=opts) as p:
result = (
p | "Initialize Pipeline" >> beam.Create(scan_for_tasks()) |
"Extract data" >> beam.ParDo(worker.TaskToData()))
我一直在尝试调试一个管道,该管道采用驱动后续 ParDo 操作的输入参数。由于我无法理解的原因,即使我禁用了自动缩放并设置了工作人员数量,管道也不会扩展到单个工作人员之外。可悲的是,GCP 上糟糕的数据流接口几乎没有说明无法扩展的问题。任何人都可以建议可能是什么问题或如何有效调试吗?
with beam.Pipeline(options=opts) as p:
result = (
p | "Initialize Pipeline" >> beam.Create(
[(f'gs://data/']) |
"Scan for extraction tasks" >> beam.ParDo(scanner.ScanForTasks()) |
"Extract data" >> beam.ParDo(worker.TaskToData()))
这个问题原来与名为“fusion”的数据流优化有关,其中相邻的操作融合在一起,大概是为了让它们可以 运行 在同一个工作器上无缝连接。问题在于,如果管道由生成大量下游任务的单个项目播种,则所有这些任务将在处理初始播种任务的同一个工作人员上处理。
解决方案是直接为管道播种任务,以防止 'optimization' 降低性能
def scan_for_tasks():
tasks = []
# Build your task list here
return tasks
with beam.Pipeline(options=opts) as p:
result = (
p | "Initialize Pipeline" >> beam.Create(scan_for_tasks()) |
"Extract data" >> beam.ParDo(worker.TaskToData()))