亚马逊 SWF 查询
Amazon SWF queries
在过去的几年里,我在 Amazon SWF 上做了很多工作,但我仍然不清楚以下几点,而且我还无法在任何论坛上找到任何直接的答案。
我想这些是非常基本的要求,相信其他人也可能遇到过。如果有人能澄清这些就太好了。
- 有没有一种简单的方法可以 return 将工作流执行结果(可能只是简单的布尔值)返回到工作流启动器?
- 有没有办法捕获 Activity 超时异常,以便我们可以在这种情况下执行 运行 自定义操作?
- 为什么 WorkflowExecutionHistory 不包含活动,为什么只包含事件?
- 为什么没有从失败点重新启动工作流的简单方法?
我正在考虑在我的工作场所将 SWF 用于更多业务流程,但这些 limitations/doubts 阻碍了我!
最终工作解决方案
public class ReturnResultActivityImpl implements ReturnResultActivity {
SettableFuture future;
public ReturnResultActivityImpl() {
}
public ReturnResultActivityImpl(SettableFuture future) {
this.future = future;
}
public void returnResult(WorkflowResult workflowResult) {
System.out.print("Marking future as Completed");
future.set(workflowResult);
}
}
public class WorkflowResult {
public WorkflowResult(boolean s, String n) {
this.success = s;
this.note = n;
}
private boolean success;
private String note;
}
public class WorkflowStarter {
@Autowired
ReturnResultActivityClient returnResultActivityClient;
@Autowired
DummyWorkflowClientExternalFactory dummyWorkflowClientExternalFactory;
@Autowired
AmazonSimpleWorkflowClient swfClient;
String domain = "test-domain;
boolean isRegister = true;
int days = 7;
int terminationTimeoutSeconds = 5000;
int threadPollCount = 2;
int taskExecutorThreadCount = 4;
public String testWorkflow() throws Exception {
SettableFuture<WorkflowResult> workflowResultFuture = SettableFuture.create();
String taskListName = "testTaskList-" + RandomStringUtils.randomAlphabetic(8);
ReturnResultActivity activity = new ReturnResultActivityImpl(workflowResultFuture);
SpringActivityWorker activityWorker = buildReturnResultActivityWorker(taskListName, Arrays.asList(activity));
DummyWorkflowClientExternalFactory factory = new DummyWorkflowClientExternalFactoryImpl(swfClient, domain);
factory.getClient().doSomething(taskListName)
WorkflowResult result = workflowResultSettableFuture.get(20, TimeUnit.SECONDS);
return "Call result note - " + result.getNote();
}
public SpringActivityWorker buildReturnResultActivityWorker(String taskListName, List activityImplementations)
throws Exception {
return setupActivityWorker(swfClient, domain, taskListName, isRegister, days, activityImplementations,
terminationTimeoutSeconds, threadPollCount, taskExecutorThreadCount);
}
}
public class Workflow {
@Autowired
private DummyActivityClient dummyActivityClient;
@Autowired
private ReturnResultActivityClient returnResultActivityClient;
@Override
public void doSomething(final String resultActivityTaskListName) {
Promise<Void> activityPromise = dummyActivityClient.dummyActivity();
returnResult(resultActivityTaskListName, activityPromise);
}
@Asynchronous
private void returnResult(final String taskListname, Promise waitFor) {
ActivitySchedulingOptions schedulingOptions = new ActivitySchedulingOptions();
schedulingOptions.setTaskList(taskListname);
WorkflowResult result = new WorkflowResult(true,"All successful");
returnResultActivityClient.returnResult(result, schedulingOptions);
}
}
标准模式是在用于传递结果的工作流启动进程中托管一个特殊的 activity。使用特定于流程的任务列表来确保它被路由到启动器的正确实例。以下是实施步骤:
- 定义一个activity来接收结果。例如 "returnResultActivity"。使此 activity 实现完成在执行时传递给其构造函数的 Future。
- 当工作流启动时,它接收 "resultActivityTaskList" 作为输入参数。最后,工作流使用工作流结果调用此 activity。 activity 已安排在已通过的任务列表中。
- 工作流启动器创建一个 ActivityWorker 和一个 Future 实例。然后它创建一个 "returnResultActivity" 的实例,并将该 future 作为构造函数参数。
- 然后它将 activity 实例注册到 activity worker 并将其配置为轮询随机生成的任务列表名称。然后它调用 "start workflow execution" 将生成的任务列表名称作为输入参数传递。
- 然后等待 Future 完成。 future.get() 将转到 return 工作流结果。
是的,如果您使用的是 AWS Flow Framework,当 activity 超时时会抛出超时异常。如果您不使用 Flow 框架,那么您的生活就会变得艰难 100 倍。顺便说一句,工作流超时也作为超时异常被抛入父工作流。无法从超时实例本身捕获工作流超时异常。在这种情况下,建议不要依赖工作流超时,而只是创建一个计时器,该计时器将触发并通知工作流逻辑某些业务事件已超时。
因为单个 activity 执行有多个关联的事件。编写将历史记录转换为您喜欢的任何活动表示形式的代码应该非常容易。这样的代码只会匹配与每个活动相关的事件。每个事件总是有对相关事件的引用,因此很容易将它们汇总到更高级别的表示中。
不幸的是,这个问题没有简单的答案。理想情况下,SWF 将通过复制其历史记录直至故障点来支持重新启动工作流。但它不受支持。我个人认为,工作流应该以一种永不失败但始终处理失败而不失败的方式编写。显然,如果由于意外情况而导致失败,它就不起作用。在这种情况下,以可以从头重新启动的方式编写工作流是最简单的方法。
在过去的几年里,我在 Amazon SWF 上做了很多工作,但我仍然不清楚以下几点,而且我还无法在任何论坛上找到任何直接的答案。
我想这些是非常基本的要求,相信其他人也可能遇到过。如果有人能澄清这些就太好了。
- 有没有一种简单的方法可以 return 将工作流执行结果(可能只是简单的布尔值)返回到工作流启动器?
- 有没有办法捕获 Activity 超时异常,以便我们可以在这种情况下执行 运行 自定义操作?
- 为什么 WorkflowExecutionHistory 不包含活动,为什么只包含事件?
- 为什么没有从失败点重新启动工作流的简单方法?
我正在考虑在我的工作场所将 SWF 用于更多业务流程,但这些 limitations/doubts 阻碍了我!
最终工作解决方案
public class ReturnResultActivityImpl implements ReturnResultActivity {
SettableFuture future;
public ReturnResultActivityImpl() {
}
public ReturnResultActivityImpl(SettableFuture future) {
this.future = future;
}
public void returnResult(WorkflowResult workflowResult) {
System.out.print("Marking future as Completed");
future.set(workflowResult);
}
}
public class WorkflowResult {
public WorkflowResult(boolean s, String n) {
this.success = s;
this.note = n;
}
private boolean success;
private String note;
}
public class WorkflowStarter {
@Autowired
ReturnResultActivityClient returnResultActivityClient;
@Autowired
DummyWorkflowClientExternalFactory dummyWorkflowClientExternalFactory;
@Autowired
AmazonSimpleWorkflowClient swfClient;
String domain = "test-domain;
boolean isRegister = true;
int days = 7;
int terminationTimeoutSeconds = 5000;
int threadPollCount = 2;
int taskExecutorThreadCount = 4;
public String testWorkflow() throws Exception {
SettableFuture<WorkflowResult> workflowResultFuture = SettableFuture.create();
String taskListName = "testTaskList-" + RandomStringUtils.randomAlphabetic(8);
ReturnResultActivity activity = new ReturnResultActivityImpl(workflowResultFuture);
SpringActivityWorker activityWorker = buildReturnResultActivityWorker(taskListName, Arrays.asList(activity));
DummyWorkflowClientExternalFactory factory = new DummyWorkflowClientExternalFactoryImpl(swfClient, domain);
factory.getClient().doSomething(taskListName)
WorkflowResult result = workflowResultSettableFuture.get(20, TimeUnit.SECONDS);
return "Call result note - " + result.getNote();
}
public SpringActivityWorker buildReturnResultActivityWorker(String taskListName, List activityImplementations)
throws Exception {
return setupActivityWorker(swfClient, domain, taskListName, isRegister, days, activityImplementations,
terminationTimeoutSeconds, threadPollCount, taskExecutorThreadCount);
}
}
public class Workflow {
@Autowired
private DummyActivityClient dummyActivityClient;
@Autowired
private ReturnResultActivityClient returnResultActivityClient;
@Override
public void doSomething(final String resultActivityTaskListName) {
Promise<Void> activityPromise = dummyActivityClient.dummyActivity();
returnResult(resultActivityTaskListName, activityPromise);
}
@Asynchronous
private void returnResult(final String taskListname, Promise waitFor) {
ActivitySchedulingOptions schedulingOptions = new ActivitySchedulingOptions();
schedulingOptions.setTaskList(taskListname);
WorkflowResult result = new WorkflowResult(true,"All successful");
returnResultActivityClient.returnResult(result, schedulingOptions);
}
}
标准模式是在用于传递结果的工作流启动进程中托管一个特殊的 activity。使用特定于流程的任务列表来确保它被路由到启动器的正确实例。以下是实施步骤:
- 定义一个activity来接收结果。例如 "returnResultActivity"。使此 activity 实现完成在执行时传递给其构造函数的 Future。
- 当工作流启动时,它接收 "resultActivityTaskList" 作为输入参数。最后,工作流使用工作流结果调用此 activity。 activity 已安排在已通过的任务列表中。
- 工作流启动器创建一个 ActivityWorker 和一个 Future 实例。然后它创建一个 "returnResultActivity" 的实例,并将该 future 作为构造函数参数。
- 然后它将 activity 实例注册到 activity worker 并将其配置为轮询随机生成的任务列表名称。然后它调用 "start workflow execution" 将生成的任务列表名称作为输入参数传递。
- 然后等待 Future 完成。 future.get() 将转到 return 工作流结果。
是的,如果您使用的是 AWS Flow Framework,当 activity 超时时会抛出超时异常。如果您不使用 Flow 框架,那么您的生活就会变得艰难 100 倍。顺便说一句,工作流超时也作为超时异常被抛入父工作流。无法从超时实例本身捕获工作流超时异常。在这种情况下,建议不要依赖工作流超时,而只是创建一个计时器,该计时器将触发并通知工作流逻辑某些业务事件已超时。
因为单个 activity 执行有多个关联的事件。编写将历史记录转换为您喜欢的任何活动表示形式的代码应该非常容易。这样的代码只会匹配与每个活动相关的事件。每个事件总是有对相关事件的引用,因此很容易将它们汇总到更高级别的表示中。
不幸的是,这个问题没有简单的答案。理想情况下,SWF 将通过复制其历史记录直至故障点来支持重新启动工作流。但它不受支持。我个人认为,工作流应该以一种永不失败但始终处理失败而不失败的方式编写。显然,如果由于意外情况而导致失败,它就不起作用。在这种情况下,以可以从头重新启动的方式编写工作流是最简单的方法。