java 中的 AWS SDK - 如何在状态机多次执行时从工作中获取活动
AWS SDK in java - How to get activities from worker when multple execution on going for a state machine
AWS 步骤函数
我的问题是如何将 sendTaskSuccess 或 sendTaskFailuer 发送到 Activity,它们在 AWS 的状态机下 运行。
我的实际意图是通知属于特定状态机执行的特定活动。
我已通过 activityARN 成功向所有等待活动发送通知。但我的实际需要是向属于特定状态机执行的特定 activity 发送通知。
例子。状态机-SM1
SM1-- SM1E1, SM1E2 有两个执行。在那种情况下,我想将 sendTaskSuccess 发送到属于 SM1E1 的 activity 。
以下是我使用的代码。但它会向所有活动发送通知
GetActivityTaskResult getActivityTaskResult = client.getActivityTask(new GetActivityTaskRequest()
.withActivityArn("arn detail"));
if (getActivityTaskResult.getTaskToken() != null) {
try {
JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
String outputResult = patientRegistrationActivity.setStatus(json.get("patientId").textValue());
System.out.println("outputResult " + outputResult);
SendTaskSuccessRequest sendTaskRequest = new SendTaskSuccessRequest().withOutput(outputResult)
.withTaskToken(getActivityTaskResult.getTaskToken());
client.sendTaskSuccess(sendTaskRequest);
} catch (Exception e) {
client.sendTaskFailure(
new SendTaskFailureRequest().withTaskToken(getActivityTaskResult.getTaskToken()));
}
据我所知,您无法控制返回哪个任务令牌。您可能会为 SM1E1 或 SM1E2 获得一个,您无法通过查看任务令牌来判断。 GetActivityTask returns "input" so based on that you may be able to tell which execution you are dealing with but if you get a token you are not interested in, I don't think there's a way to put it back so you won't be able to get it again with GetActivityTask稍后。我想你可以把它存储在某个地方的数据库中以备后用。
您可以尝试的一个想法是使用新的 callback integration pattern. You can specify the Payload parameter in the state definition to include the task token like this token.$: "$$.Task.Token"
and then use GetExecutionHistory 查找您感兴趣的执行的 TaskScheduled 状态并检索 parameters.Payload.token
值,然后将其与 sendTaskSuccess.
一起使用
这是我的 serverless.yml 文件的一个片段,它描述了状态
WaitForUserInput: #Wait for the user to do something
Type: Task
Resource: arn:aws:states:::lambda:invoke.waitForTaskToken
Parameters:
FunctionName:
Fn::GetAtt: [WaitForUserInputLambdaFunction, Arn]
Payload:
token.$: "$$.Task.Token"
executionArn.$: "$$.Execution.Id"
Next: DoSomethingElse
我做了一个 POC 检查,下面是解决方案。
如果令牌被 getActivityTaskResult.getTaskToken()
消耗,并且如果请求输入不满足您的条件,那么您可以使用下面的行来避免令牌消耗 .awsStepFunctionClient.sendTaskHeartbeat(new SendTaskHeartbeatRequest().withTaskToken(taskToken))
AWS 步骤函数
我的问题是如何将 sendTaskSuccess 或 sendTaskFailuer 发送到 Activity,它们在 AWS 的状态机下 运行。
我的实际意图是通知属于特定状态机执行的特定活动。
我已通过 activityARN 成功向所有等待活动发送通知。但我的实际需要是向属于特定状态机执行的特定 activity 发送通知。
例子。状态机-SM1 SM1-- SM1E1, SM1E2 有两个执行。在那种情况下,我想将 sendTaskSuccess 发送到属于 SM1E1 的 activity 。
以下是我使用的代码。但它会向所有活动发送通知
GetActivityTaskResult getActivityTaskResult = client.getActivityTask(new GetActivityTaskRequest()
.withActivityArn("arn detail"));
if (getActivityTaskResult.getTaskToken() != null) {
try {
JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
String outputResult = patientRegistrationActivity.setStatus(json.get("patientId").textValue());
System.out.println("outputResult " + outputResult);
SendTaskSuccessRequest sendTaskRequest = new SendTaskSuccessRequest().withOutput(outputResult)
.withTaskToken(getActivityTaskResult.getTaskToken());
client.sendTaskSuccess(sendTaskRequest);
} catch (Exception e) {
client.sendTaskFailure(
new SendTaskFailureRequest().withTaskToken(getActivityTaskResult.getTaskToken()));
}
据我所知,您无法控制返回哪个任务令牌。您可能会为 SM1E1 或 SM1E2 获得一个,您无法通过查看任务令牌来判断。 GetActivityTask returns "input" so based on that you may be able to tell which execution you are dealing with but if you get a token you are not interested in, I don't think there's a way to put it back so you won't be able to get it again with GetActivityTask稍后。我想你可以把它存储在某个地方的数据库中以备后用。
您可以尝试的一个想法是使用新的 callback integration pattern. You can specify the Payload parameter in the state definition to include the task token like this token.$: "$$.Task.Token"
and then use GetExecutionHistory 查找您感兴趣的执行的 TaskScheduled 状态并检索 parameters.Payload.token
值,然后将其与 sendTaskSuccess.
这是我的 serverless.yml 文件的一个片段,它描述了状态
WaitForUserInput: #Wait for the user to do something
Type: Task
Resource: arn:aws:states:::lambda:invoke.waitForTaskToken
Parameters:
FunctionName:
Fn::GetAtt: [WaitForUserInputLambdaFunction, Arn]
Payload:
token.$: "$$.Task.Token"
executionArn.$: "$$.Execution.Id"
Next: DoSomethingElse
我做了一个 POC 检查,下面是解决方案。
如果令牌被 getActivityTaskResult.getTaskToken()
消耗,并且如果请求输入不满足您的条件,那么您可以使用下面的行来避免令牌消耗 .awsStepFunctionClient.sendTaskHeartbeat(new SendTaskHeartbeatRequest().withTaskToken(taskToken))