如何在 AWS Lambda 函数中获取任务令牌

How to get Task Token in AWS Lambda Function

我有一个 AWS Step Function,其处理程序在 Java 中实现。

我的步进函数定义:

definition:
  Comment: Steps for issuing a card
  StartAt: RecipientFraudChecks
  States:
    RecipientFraudChecks:
      Type: Task
      Next: SaveTaskToken
      Resource: arn:aws:lambda:eu-west-1:099720403855:RecipientFraudChecks
    SaveTaskToken:
      Type: Task
      Resource: arn:aws:lambda:eu-west-1:12345678:function:SaveTaskToken
      End: true

我有一个 Java 项目,所有 Lambda 函数处理程序都在其中定义:

public class SaveTaskToken implements RequestHandler<Map<String,String>, String> {

   ....

   @Override
   public String handleRequest(Map<String, String> input, final Context context) {

      // do the fraud checks
      System.out.println("the context is: " + gson.toJson(context));
      System.out.println("input: " + gson.toJson(input));

}

我是运行本地使用AWS SAM的step函数,并根据这个触发:https://docs.aws.amazon.com/step-functions/latest/dg/sfn-local-lambda.html#install-sam

context 我希望看到任务令牌,但我没有。日志显示:

the context is: {
  "memoryLimit": 512,
  "awsRequestId": "5065a9aa-1a4a-46fe-9b58-7dc2194f92b7",
  "logGroupName": "aws/lambda/SaveTaskToken",
  "logStreamName": "$LATEST",
  "functionName": "SaveTaskToken",
  "functionVersion": "$LATEST",
  "invokedFunctionArn": "",
  "cognitoIdentity": {
    "identityId": "",
    "poolId": ""
  },
  "logger": {}
}

事实上,它与我期望的全局上下文完全不同 in the docs

我做错了什么?如何获得任务令牌?

编辑

我将 Parameters 属性 添加到 'SaveTaskToken' 并将资源更改为 arn:aws:states:::lambda:invoke.waitForTaskToken 并且知道我可以获得任务令牌:

definition:
  Comment: Steps for issuing a card
  StartAt: RecipientFraudChecks
  States:
    RecipientFraudChecks:
      Type: Task
      Next: SaveTaskToken
      Resource: arn:aws:lambda:eu-west-1:099720403855:RecipientFraudChecks
    SaveTaskToken:
      Type: Task
      Resource: arn:aws:states:::lambda:invoke.waitForTaskToken
      Parameters:
        FunctionName: arn:aws:lambda:eu-west-1:12345678:function:SaveTaskToken
        Payload:
          taskToken
      End: true

在日志中我可以看到:

the input is: {
  "taskToken": "5286"
}

它导致了另一个问题 - 它覆盖了状态机的输入。我正在传递输入:

{"giftCode": "xxx"}

在第一个 Lambda 函数中,RecipientFraudChecks,我可以获得输入。但是,在第二个中,由于添加了 Parameters 属性,我现在无法再获取状态机的输入,只能获取任务令牌...

编辑 在这里实现了答案:

{
  "Comment": "Steps for issuing a card",
  "StartAt": "RecipientFraudChecks",
  "States": {
    "RecipientFraudChecks": {
      "Type": "Task",
      "Next": "PauseCardIfNecessary",
      "ResultPath": "$.firstLambdaOutput",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:RecipientFraudChecks"
    },
    "PauseCardIfNecessary": {
      "Type": "Task",
      "Next": "GetOrCreateClient",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:PauseCardIfNecessary",
        "Payload": {
          "token.$": "$$.Task.Token",
          "otherInput.$": "$"
        }
      }
    },
    "GetOrCreateClient": {
      "Type": "Task",
      "Next": "GetOrAccountClient",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:GetOrCreateClient"
    },
    "GetOrAccountClient": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:GetOrAccountClient",
      "End": true
    }
  }
}

但是我又遇到了一个错误,这是日志:

arn: aws: states: eu-west-1: 123456789012: execution: HelloWorld5: cardIssue: {
  "Type": "TaskStateExited",
  "PreviousEventId": 5,
  "StateExitedEventDetails": {
    "Name": "RecipientFraudChecks",
    "Output": "{\"inputToStep\":\"xxxx\",\"firstLambdaOutput\":\"output of recipient lambda\"}"
  }
} arn: aws: states: eu-west-1: 123456789012: execution: HelloWorld5: cardIssue: {
  "Type": "TaskStateEntered",
  "PreviousEventId": 6,
  "StateEnteredEventDetails": {
    "Name": "PauseCardIfNecessary",
    "Input": "{\"inputToStep\":\"xxxx\",\"firstLambdaOutput\":\"output of recipient lambda\"}"
  }
} arn: aws: states: eu-west-1: 123456789012: execution: HelloWorld5: cardIssue: {
  "Type": "ExecutionFailed",
  "PreviousEventId": 7,
  "ExecutionFailedEventDetails": {
    "Error": "States.Runtime",
    "Cause": "An error occurred while executing the state 'PauseCardIfNecessary' (entered at the event id #7). The value for the field 'token.$' must be a valid JSONPath expression"
  }
}

关于输入,文档中似乎有一个简单的解决方案 [1]:

Instead of hard-coding the event payload in the state machine definition, you can use the input from the state machine execution. The following example uses the input specified when you run the state machine as the event payload:

"Payload.$": "$"

文档中还有另一个示例 [2]:

{  
   "StartAt":"GetManualReview",
   "States":{  
      "GetManualReview":{  
         "Type":"Task",
         "Resource":"arn:aws:states:::lambda:invoke.waitForTaskToken",
         "Parameters":{  
            "FunctionName":"get-model-review-decision",
            "Payload":{  
               "model.$":"$.new_model",
               "token.$":"$$.Task.Token"
            },
            "Qualifier":"prod-v1"
         },
         "End":true
      }
   }
}

您可以将 "model.$":"$.new_model" 更改为 "input.$":"$" 之类的内容并获得所需的 Lambda 负载。

在 YAML 中转换为以下内容:

Parameters:
  Payload:
    input.$: "$"
    token.$: "$$.Task.Token"

[1] https://docs.aws.amazon.com/lambda/latest/dg/services-stepfunctions.html#services-stepfunctions-setup
[2] https://docs.amazonaws.cn/en_us/step-functions/latest/dg/connect-lambda.html

任务令牌不会在 lambda 上下文中自动传递,它需要作为输入传递。

这里的context不是lambda函数的上下文,而是任务的上下文,为了从上下文中抓取细节,我们可以在step函数定义中使用$$., 例如:

  • 获取任务令牌$$.Task.Token
  • 获取开始时间$$.Execution.StartTime

示例步骤函数:

  • 第一个任务执行带步进函数输入的 Lambda。
  • 在第一个 lambda 的输出中附加阶跃函数输入。
  • 第二个任务使用资源 waitForTaskToken 执行另一个 lambda,在这里,我们获取任务令牌并将其与上一步的输出一起作为输入传递。
  • Step functions 一直等到它得到 SendTaskSuccess or SendTaskFailure

{
  "StartAt": "fist-lambda-invoke-sync",
  "States": {
    "fist-lambda-invoke-sync": {
      "Next": "second-lambda-invoke-task-token",
      "Type": "Task",
      "ResultPath": "$.firstLambdaOutput",
      "Resource": "arn:aws:lambda:us-east-1:11112223333:function:myfirstlambda"
    },
    "second-lambda-invoke-task-token": {
      "End": true,
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:us-east-1:11112223333:function:mysecondlambda",
        "Payload": {
          "token.$": "$$.Task.Token",
          "otherInput.$": "$"
        }
      }
    }
  }
}

步进函数的输入:

{
  "inputToStep": "myValue"
}

第一个 Lambda 的输出:假设它是一个字符串值“10”。由于 "ResultPath": "$.firstLambdaOutput"

,它将附加到输入 Json
{
  "inputToStep": "myValue",
  "firstLambdaOutput": "10"
}

第二个 Lambda 的输入:由于 "token.$": "$$.Task.Token""otherInput.$": "$"[=,将收到以下 json 作为附加了任务令牌的输入24=]

{ otherInput: { inputToStep: 'myValue', firstLambdaOutput: '10' },
  token: 'This is where Task Token generated by step function will be sent' }