AWS Simple Workflow Service - 运行 long 运行ning 可靠地处理
AWS Simple Workflow Service - run long running processes reliably
情况:
我想使用 AWS SWF 来协调长时间的 运行 手动活动。当在 AWS 中安排 activity 时,我将其传输到数据库以在 UI 上显示哪些任务正在等待处理。这些任务可能需要数周才能完成,因此它们在 SWF 中有大量超时。
问题:
万一我的应用程序无法填充数据库(挂起或死掉而没有报告任何错误),那么任务不会被人看到并且重试只能在几周后发生,当 activity 超时(这是显然不能接受)。
问题:
所以我希望能够 "start" 任务(比如超时 30 秒),当应用程序确定 activity 现在开始时,将超时设置为数周。真的可以用SWF优雅地做到吗?
(我已经通读了文档和几个示例,但仍然不明白 运行 手动任务的设想方式是什么)
很遗憾,SWF 服务不支持 "start activity task" API 调用。我使用的解决方法是使用具有短超时的 activity 将记录插入数据库。然后在关于它的手动任务完成信号工作流上。需要一个单独的计时器来处理手动任务超时。所有这些逻辑都可以封装在一个单独的 class 中以供重用。
使用信号的另一个好处是手动任务通常有多个状态。例如,当任务被认领并稍后被释放时,工作流可以发出信号。每个状态可以有不同的超时。
[编辑:添加了 strawman ManualActivityClient 示例]
public class ManualActivityClient {
private final Map<String, Settable<Void>> outstandingManualActivities = new HashMap<>();
private StartManualActivityClient startActivityClient;
private WorkflowClock clock;
public Promise<Void> invoke(String id, String activityArgs, long timeout) {
Promise<Void> started = startActivityClient.start(id, activityArgs);
Settable<Void> completionPromise = new Settable<>();
outstandingManualActivities.put(id, completionPromise);
// TryFinally is used to define cancellation scope for automatic timer cancellation.
new TryFinally() {
@Override
protected void doTry() throws Throwable {
// Wrap timer invocation in Task(true) to give it daemon flag. Daemon tasks are automatically
// cancelled when all other tasks in the same scope (defined by doTry) are done.
new Task(true) {
@Override
protected void doExecute() throws Throwable {
Promise<Void> manualActivityTimeout = clock.createTimer(timeout);
new Task(manualActivityTimeout) {
@Override
protected void doExecute() throws Throwable {
throw new TimeoutException("Manual activity " + id + " timed out");
}
};
}
};
// This task is used to "wait" for manual task completion. Without it the timer would be
// immediately cancelled.
new Task(completionPromise) {
@Override
protected void doExecute() throws Throwable {
// Intentionally empty
}
};
}
@Override
protected void doFinally() throws Throwable {
}
};
return completionPromise;
}
public void signalManualActivityCompletion(String id) {
// Set completionPromise to ready state
outstandingManualActivities.get(id).set(null);
}
}
而这个 class 可以用作:
@Workflow(...)
public class ManualActivityWorkflow {
private ManualActivityClient manualActivityClient;
@Execute(...)
public void execute() {
// ...
Promise<Void> activity1 = manualActivityClient.invoke("activity1", "someArgs1", 300);
Promise<Void> activity2 = manualActivityClient.invoke("activity2", "someArgs2", 300);
// ...
}
@Signal(...)
public void signalManualActivityCompletion(String id) {
manualActivityClient.signalManualActivityCompletion(id);
}
}
情况:
我想使用 AWS SWF 来协调长时间的 运行 手动活动。当在 AWS 中安排 activity 时,我将其传输到数据库以在 UI 上显示哪些任务正在等待处理。这些任务可能需要数周才能完成,因此它们在 SWF 中有大量超时。
问题:
万一我的应用程序无法填充数据库(挂起或死掉而没有报告任何错误),那么任务不会被人看到并且重试只能在几周后发生,当 activity 超时(这是显然不能接受)。
问题:
所以我希望能够 "start" 任务(比如超时 30 秒),当应用程序确定 activity 现在开始时,将超时设置为数周。真的可以用SWF优雅地做到吗?
(我已经通读了文档和几个示例,但仍然不明白 运行 手动任务的设想方式是什么)
很遗憾,SWF 服务不支持 "start activity task" API 调用。我使用的解决方法是使用具有短超时的 activity 将记录插入数据库。然后在关于它的手动任务完成信号工作流上。需要一个单独的计时器来处理手动任务超时。所有这些逻辑都可以封装在一个单独的 class 中以供重用。
使用信号的另一个好处是手动任务通常有多个状态。例如,当任务被认领并稍后被释放时,工作流可以发出信号。每个状态可以有不同的超时。
[编辑:添加了 strawman ManualActivityClient 示例]
public class ManualActivityClient {
private final Map<String, Settable<Void>> outstandingManualActivities = new HashMap<>();
private StartManualActivityClient startActivityClient;
private WorkflowClock clock;
public Promise<Void> invoke(String id, String activityArgs, long timeout) {
Promise<Void> started = startActivityClient.start(id, activityArgs);
Settable<Void> completionPromise = new Settable<>();
outstandingManualActivities.put(id, completionPromise);
// TryFinally is used to define cancellation scope for automatic timer cancellation.
new TryFinally() {
@Override
protected void doTry() throws Throwable {
// Wrap timer invocation in Task(true) to give it daemon flag. Daemon tasks are automatically
// cancelled when all other tasks in the same scope (defined by doTry) are done.
new Task(true) {
@Override
protected void doExecute() throws Throwable {
Promise<Void> manualActivityTimeout = clock.createTimer(timeout);
new Task(manualActivityTimeout) {
@Override
protected void doExecute() throws Throwable {
throw new TimeoutException("Manual activity " + id + " timed out");
}
};
}
};
// This task is used to "wait" for manual task completion. Without it the timer would be
// immediately cancelled.
new Task(completionPromise) {
@Override
protected void doExecute() throws Throwable {
// Intentionally empty
}
};
}
@Override
protected void doFinally() throws Throwable {
}
};
return completionPromise;
}
public void signalManualActivityCompletion(String id) {
// Set completionPromise to ready state
outstandingManualActivities.get(id).set(null);
}
}
而这个 class 可以用作:
@Workflow(...)
public class ManualActivityWorkflow {
private ManualActivityClient manualActivityClient;
@Execute(...)
public void execute() {
// ...
Promise<Void> activity1 = manualActivityClient.invoke("activity1", "someArgs1", 300);
Promise<Void> activity2 = manualActivityClient.invoke("activity2", "someArgs2", 300);
// ...
}
@Signal(...)
public void signalManualActivityCompletion(String id) {
manualActivityClient.signalManualActivityCompletion(id);
}
}