如何从另一个工作流启动一个工作流并检索被调用工作流的 return 值

how do you start a workflow from another workflow and retrieve the return value of called workflow

我正在测试 google 工作流,并希望从另一个工作流调用一个工作流,但作为一个单独的过程(不是子工作流)

我可以开始执行,但目前无法检索 return 值。相反,我收到了一个执行实例:

{
     "argument": "null",
     "name": "projects/xxxxxxxxxxxx/locations/us-central1/workflows/child-workflow/executions/9fb4aa01-2585-42e7-a79f-cfb4b57b22d4",
     "startTime": "2020-12-09T01:38:07.073406981Z",
     "state": "ACTIVE",
     "workflowRevisionId": "000003-cf3"
 }

parent-workflow.yaml

main:
params: [args]

steps:
    
    - callChild:
        call: http.post
        args:
            url: 'https://workflowexecutions.googleapis.com/v1beta/projects/my-project/locations/us-central1/workflows/child-workflow/executions'
            auth: 
                type: OAuth2
                scope: 'https://www.googleapis.com/auth/cloud-platform'
        result: callresult
    
    - returnValue:
        return: ${callresult.body}

child-workflow.yaml:

 - getCurrentTime:
        call: http.get
        args:
            url: https://us-central1-workflowsample.cloudfunctions.net/datetime
        result: CurrentDateTime
    - readWikipedia:
        call: http.get
        args:
            url: https://en.wikipedia.org/w/api.php
            query:
                action: opensearch
                search: ${CurrentDateTime.body.dayOfTheWeek}
        result: WikiResult
    - returnOutput:
        return: ${WikiResult.body[1]}

另外一个问题是如何从变量创建动态 url。 ${} 在那里似乎不起作用

由于执行是异步 API 调用,您需要轮询工作流以查看何时完成。

你可以有如下算法:

main:
  steps:  
    - callChild:
        call: http.post
        args:
            url: ${"https://workflowexecutions.googleapis.com/v1beta/projects/"+sys.get_env("GOOGLE_CLOUD_PROJECT_ID")+"/locations/us-central1/workflows/http_bitly_secrets/executions"}
            auth: 
                type: OAuth2
                scope: 'https://www.googleapis.com/auth/cloud-platform'
        result: workflow
    - waitExecution:
        call: CloudWorkflowsWaitExecution
        args:
          execution: ${workflow.body.name}
        result: workflow
    - returnValue:
        return: ${workflow}
CloudWorkflowsWaitExecution:
  params: [execution]
  steps:
    - init:
        assign:
          - i: 0
          - valid_states: ["ACTIVE","STATE_UNSPECIFIED"]
          - result: 
              state: ACTIVE
    - check_condition:
        switch:
          - condition: ${result.state in valid_states AND i<100}
            next: iterate
        next: exit_loop
    - iterate:
        steps:
          - sleep:
              call: sys.sleep
              args:
                seconds: 10
          - process_item:
              call: http.get
              args:
                url: ${"https://workflowexecutions.googleapis.com/v1beta/"+execution}
                auth:
                  type: OAuth2
              result: result
          - assign_loop:
              assign:
                - i: ${i+1}
                - result: ${result.body} 
        next: check_condition
    - exit_loop:
        return: ${result}

你在这里看到的是我们有一个 CloudWorkflowsWaitExecution 子工作流,最多循环 100 次,还有 10 秒的延迟,当工作流完成时它会停止,并且 returns结果。

输出为:

argument: 'null'
endTime: '2020-12-09T13:00:11.099830035Z'
name: projects/985596417983/locations/us-central1/workflows/call_another_workflow/executions/05eeefb5-60bb-4b20-84bd-29f6338fa66b
result: '{"argument":"null","endTime":"2020-12-09T13:00:00.976951808Z","name":"projects/985596417983/locations/us-central1/workflows/http_bitly_secrets/executions/2f4b749c-4283-4c6b-b5c6-e04bbcd57230","result":"{\"archived\":false,\"created_at\":\"2020-10-17T11:12:31+0000\",\"custom_bitlinks\":[],\"deeplinks\":[],\"id\":\"j.mp/2SZaSQK\",\"link\":\"//<edited>/2SZaSQK\",\"long_url\":\"https://cloud.google.com/blog\",\"references\":{\"group\":\"https://api-ssl.bitly.com/v4/groups/Bg7eeADYBa9\"},\"tags\":[]}","startTime":"2020-12-09T13:00:00.577579042Z","state":"SUCCEEDED","workflowRevisionId":"000001-478"}'
startTime: '2020-12-09T13:00:00.353800247Z'
state: SUCCEEDED
workflowRevisionId: 000012-cb8

result 中有一个子项保存外部工作流执行的结果。

现在最好的方法是 workflows.executions.run 辅助方法,它格式化请求并阻止直到工作流执行完成:

- run_execution:
    try:
      call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
      args:
        workflow_id: ${workflow}
        location: ${location}   # Defaults to current location
        project_id: ${project}  # Defaults to current project
        argument: ${arguments}  # Arguments could be specified inline as a map instead.
      result: r1
    except:
      as: e
      steps: ... # handle a failed execution