SWF:结束基于 Promise 的工作流程

SWF: End workflow based on Promise

我有一个调用 activity 的决策器。 activity return 是验证错误列表。如果决策者收到该列表并且它不为空,我想退出工作流程。

要退出工作流,我只需在决策程序中 return,它就会退出工作流。问题是我无法从 activity 获得结果。我原以为这会起作用,但它不起作用.. violations.isReady() 总是 returns false:

public class WorkflowImpl implements Workflow {
    ActivitiesClient activities;

    public Workflow(/*...*/) {
        // ...
    }

    @Override
    public void do(WorkflowInput input) {
        Promise<List<String>> violations = activities.validate(input);

        if (!violations.isReady()) { // "do()" will be called when violations is ready.. right?
            return;
        } else if (!CollectionUtils.isEmpty(violations.get())) {
            return; // or throw ValidationException
        }

        // do other stuff
    }
}

我不想做替代方案,即轮询 violations.isReady() 是否为真,因为这会占用决策者线程,谁知道会占用多长时间。我什至不确定这是否可行。

帮忙?

工作流代码是异步的。因此,从 activity 返回的 Promise 永远不会在同一个 回调 中准备就绪。所以在你的代码中 violations.isReady() 总是返回 false 是正确的行为。 您必须使用注释为 @Asynchronous 或 Task 的方法将 callback 关联到 Promise。所以你的代码应该是这样的:

public class WorkflowImpl implements Workflow {
  ActivitiesClient activities;

  public Workflow(/*...*/) {
      // ...
  }

  @Override
  public void do(WorkflowInput input) {
    Promise<List<String>> violations = activities.validate(input);
    processValidationResult(violations);
  }

  @Asynchronous
  private void processValidationResult(Promise<List<String>> violations) {
    // As method is @Asynchronous framework guarantees violations is ready 
    // when its body is executed.
    if (!CollectionUtils.isEmpty(violations.get())) {
        return; // or throw ValidationException
    }
    // do other stuff
  }
}

注意 AspectJ should be configured correctly 与 Flow Framework Aspects for @Asynchronous annotation 生效。

另一种选择是直接使用任务:

public class WorkflowImpl implements Workflow {
  ActivitiesClient activities;

  public Workflow(/*...*/) {
      // ...
  }

  @Override
  public void do(WorkflowInput input) {
    Promise<List<String>> violations = activities.validate(input);
    new Task(violations) {
       @Override
       public void  doExecute() {
         // Framework guarantees violations is ready 
         // (as it is Task constructor parameter)
         // when execute method is executed.
         if (!CollectionUtils.isEmpty(violations.get())) {
             return; // or throw ValidationException
         }
         // do other stuff
       }
    };
  }
}

我建议通读 SWF Recipes and samples 以更好地了解编写 SWF 工作流时使用的模式。

还要确保您通过 TryCatchFinally javadoc 作为 AWS Flow Framework 错误处理,虽然非常强大,但与大多数人习惯的完全不同。

已添加以显示从 processValidationResult(...) 返回值的示例:

  @Override
  public void do(WorkflowInput input) {
    Promise<List<String>> violations = activities.validate(input);
    Promise<String> whatever = processValidationResult(violations);
    processNextStep(whatever);
  }

  @Asynchronous
  private Promise<String> processValidationResult(Promise<List<String>> violations) {
    // As method is @Asynchronous framework guarantees violations is ready 
    // when its body is executed.
    if (!CollectionUtils.isEmpty(violations.get())) {
        throw new ValidationException(...);
    }
    // do other stuff
    return Promise.asPromise("result string");
  }

  @Asynchronous
  private void processNextStep(Promise<String> whatever) {
     ...
  }