Argo Workflows 中的动态 "Fan In"

Dynamic "Fan In" in Argo Workflows

Argo 允许基于先前步骤的输出动态生成并行工作流步骤。

此处提供了此动态工作流生成的示例:https://github.com/argoproj/argo-workflows/blob/master/examples/loops-param-result.yaml

我正在尝试创建一个类似的工作流程,其中最后一个 'fan-in' 步骤将从动态创建的并行步骤中读取输出。这是一个尝试:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: write
        template: output-number
        arguments:
          parameters:
          - name: number
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"
    - - name: fan-in
        template: fan-in
        arguments:
          parameters:
          - name: numbers
            value: "{{steps.write.outputs.parameters.number}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: output-number
    inputs:
      parameters:
      - name: number
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
    outputs:
      parameters:
        - name: number
          valueFrom:
            path: /tmp/number.txt

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo received {{inputs.parameters.numbers}}"]

我能够提交此工作流,并且它运行成功。不幸的是,最后 fan-in 步骤的输出如下所示:

fan-in: received {{steps.write.outputs.parameters.number}}

输入 numbers 参数的值未被插值。关于如何让它工作的任何想法?

可通过 steps.STEP-NAME.outputs.parameters 访问汇总的步骤输出参数。无法按名称访问一个参数的一组聚合输出。

对您的工作流程的这种细微改变应该可以满足您的需求:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: write
        template: output-number
        arguments:
          parameters:
          - name: number
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"
    - - name: fan-in
        template: fan-in
        arguments:
          parameters:
          - name: numbers
            value: "{{steps.write.outputs.parameters}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: output-number
    inputs:
      parameters:
      - name: number
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
    outputs:
      parameters:
        - name: number
          valueFrom:
            path: /tmp/number.txt

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo received {{inputs.parameters.numbers}}"]

唯一的变化是从 {{steps.write.outputs.parameters.number}} 中删除 .number

这是新的输出:

received [{number:20},{number:21},{number:22},{number:23},{number:24},{number:25},{number:26},{number:27},{number:28},{number:29},{number:30}]

这里是 GitHub issue where output parameter aggregation was discussed/created.

我已经放入 enhancement proposal 以按名称访问聚合输出参数。