AWS 简单工作流通知技术
AWS Simple workflow notification technique
我是 AWS SWF 的新手,我的任务是仅在 activity 首次尝试失败时发出通知(通过电子邮件)。我使用 Settable<Boolean>
作为我的标志,但该值不可靠,因为工作流是 运行 异步的。这是我的代码:
final AsyncExecutor asyncExecutor = new AsyncRetryingExecutor(retryPolicy, workflowClock);
final Settable<Boolean> notifyException = new Settable<>();
new TryCatch() {
@Override
protected void doTry() throws Throwable {
asyncExecutor.execute(() -> new TryCatch() {
@Override
protected void doTry() throws Throwable {
Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
}
@Override
protected void doCatch(Throwable e) throws Throwable {
if (!notifyException.isReady()) {
// PERFORM notification service here!!
notifyException.set(true);
} else {
// DO NOTHING. Notification is already sent the first time :)
}
throw e;
}
});
}
@Override
protected void doCatch(Throwable e) throws Throwable {
System.out.println("======");
System.out.println("CATCH ALL!! " + e.getMessage());
System.out.println("======");
}
};
notifyException
的值始终在变化,即使我在 if statement
中明确将其值设置为 true
。我的代码的结果是它将执行超过 1 个 notification service
.
====更新===
final AsyncExecutor asyncExecutor = new AsyncRetryingExecutor(retryPolicy, workflowClock);
final Settable<Boolean> notifyException = new Settable<>();
new TryCatch() {
@Override
protected void doTry() throws Throwable {
asyncExecutor.execute(() -> new TryCatch() {
@Override
protected void doTry() throws Throwable {
Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
}
@Override
protected void doCatch(Throwable e) throws Throwable {
if (!notifyException.isReady()) {
activityOneClient.notify();
notifyException.set(true);
}
throw e;
}
});
}
@Override
protected void doCatch(Throwable e) throws Throwable {
System.out.println("======");
System.out.println("CATCH ALL!! " + e.getMessage());
System.out.println("======");
}
};
当我重新抛出异常时,activityOneClient.notify()
只会在asyncExecutor
的最后一次重试时执行,所以如果我不重新抛出,activityOneClient.notify()
将是自动执行。如果第一次出现异常,我只需要通知即可。
代码看起来还不错(还没有 运行)。
你的问题在别的地方。流程框架的工作方式是,每当有工作要做时,工作流工作人员或 activity 工作人员接手任务,运行 并报告结果。报告结果意味着执行的结果被捕获在工作流历史中。
每当历史改变决策者(工作流)工作者 运行s 并决定接下来应该发生什么。在幕后,决策者只是重播所有历史并从一开始就做出所有决定。现在,如果决策与历史记录相匹配,决策者就会继续下去,直到它遇到新的东西。
简单的例子。假设您有一个包含 3 个步骤的工作流程:
- activity1
- activity2 - 取决于 activity1
的结果
- activity3 - 取决于 activity2
的结果
[ 请记住,乘法活动可以同时 运行,如果 activity 的输出与另一个 activity 的输入之间没有 link流程框架将向前推进并并行安排多项工作。保持简单,这样你就明白了 ]
在工作流启动时的 T0 没有历史记录,决策程序 运行s 和 activity1 已安排。
在 T1,activity1 名工人拿起任务,执行并报告结果。
在 T2 作为 activity1 更新历史的结果,决策者被安排并且 运行s + 它读取历史。它看到 activity1 应该 运行,它在历史记录中看到它,它看到它完成,它安排 activity2
在 T3,activity2 名工人拿起任务,执行并报告结果。
在 T4 作为 activity2 更新历史的结果,决策者被安排并且 运行s + 它读取历史。它看到 activity1 应该 运行,它在历史记录中看到它,它看到它完成了。它现在看到 activity2 应该 运行,它在历史记录中看到它,它看到它已经完成并继续前进到 activity3。它安排 activity3.
等等。
你的问题是,每次决策 运行s(每个决策)时,决策程序中指示流程的代码都是 运行。因此,当决策者到达 activity 时,它将看到它有 运行 并且已抛出异常,并且它将通过设置标志的代码 + 发送电子邮件。
当指数重试开始时,将设置标志,但发送电子邮件的代码将在每个决策上 运行(决策者基本上是无状态的,状态是基于历史建立的).
在这种特殊情况下,您可以做的是将发送电子邮件的部分单独移动 activity。这样,决策者将在历史记录中看到它并快进,而不是每次都 运行ning 它。
为了只通知第一个抛出的异常,我做的是:
@Override
protected void doTry() throws Throwable {
asyncExecutor.execute(() -> new TryCatchFinally() {
Throwable throwable = null;
@Override
protected void doTry() throws Throwable {
Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
}
@Override
protected void doCatch(Throwable e) throws Throwable {
if (!notifyException.isReady()) {
activityOneClient.notify();
notifyException.set(true);
}
throwable = e;
}
@Override
protected void doFinally() throws Throwable {
if (throwable != null) {
throw throwable;
}
}
});
}
我是 AWS SWF 的新手,我的任务是仅在 activity 首次尝试失败时发出通知(通过电子邮件)。我使用 Settable<Boolean>
作为我的标志,但该值不可靠,因为工作流是 运行 异步的。这是我的代码:
final AsyncExecutor asyncExecutor = new AsyncRetryingExecutor(retryPolicy, workflowClock);
final Settable<Boolean> notifyException = new Settable<>();
new TryCatch() {
@Override
protected void doTry() throws Throwable {
asyncExecutor.execute(() -> new TryCatch() {
@Override
protected void doTry() throws Throwable {
Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
}
@Override
protected void doCatch(Throwable e) throws Throwable {
if (!notifyException.isReady()) {
// PERFORM notification service here!!
notifyException.set(true);
} else {
// DO NOTHING. Notification is already sent the first time :)
}
throw e;
}
});
}
@Override
protected void doCatch(Throwable e) throws Throwable {
System.out.println("======");
System.out.println("CATCH ALL!! " + e.getMessage());
System.out.println("======");
}
};
notifyException
的值始终在变化,即使我在 if statement
中明确将其值设置为 true
。我的代码的结果是它将执行超过 1 个 notification service
.
====更新===
final AsyncExecutor asyncExecutor = new AsyncRetryingExecutor(retryPolicy, workflowClock);
final Settable<Boolean> notifyException = new Settable<>();
new TryCatch() {
@Override
protected void doTry() throws Throwable {
asyncExecutor.execute(() -> new TryCatch() {
@Override
protected void doTry() throws Throwable {
Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
}
@Override
protected void doCatch(Throwable e) throws Throwable {
if (!notifyException.isReady()) {
activityOneClient.notify();
notifyException.set(true);
}
throw e;
}
});
}
@Override
protected void doCatch(Throwable e) throws Throwable {
System.out.println("======");
System.out.println("CATCH ALL!! " + e.getMessage());
System.out.println("======");
}
};
当我重新抛出异常时,activityOneClient.notify()
只会在asyncExecutor
的最后一次重试时执行,所以如果我不重新抛出,activityOneClient.notify()
将是自动执行。如果第一次出现异常,我只需要通知即可。
代码看起来还不错(还没有 运行)。
你的问题在别的地方。流程框架的工作方式是,每当有工作要做时,工作流工作人员或 activity 工作人员接手任务,运行 并报告结果。报告结果意味着执行的结果被捕获在工作流历史中。
每当历史改变决策者(工作流)工作者 运行s 并决定接下来应该发生什么。在幕后,决策者只是重播所有历史并从一开始就做出所有决定。现在,如果决策与历史记录相匹配,决策者就会继续下去,直到它遇到新的东西。
简单的例子。假设您有一个包含 3 个步骤的工作流程:
- activity1
- activity2 - 取决于 activity1 的结果
- activity3 - 取决于 activity2 的结果
[ 请记住,乘法活动可以同时 运行,如果 activity 的输出与另一个 activity 的输入之间没有 link流程框架将向前推进并并行安排多项工作。保持简单,这样你就明白了 ]
在工作流启动时的 T0 没有历史记录,决策程序 运行s 和 activity1 已安排。
在 T1,activity1 名工人拿起任务,执行并报告结果。
在 T2 作为 activity1 更新历史的结果,决策者被安排并且 运行s + 它读取历史。它看到 activity1 应该 运行,它在历史记录中看到它,它看到它完成,它安排 activity2
在 T3,activity2 名工人拿起任务,执行并报告结果。
在 T4 作为 activity2 更新历史的结果,决策者被安排并且 运行s + 它读取历史。它看到 activity1 应该 运行,它在历史记录中看到它,它看到它完成了。它现在看到 activity2 应该 运行,它在历史记录中看到它,它看到它已经完成并继续前进到 activity3。它安排 activity3.
等等。
你的问题是,每次决策 运行s(每个决策)时,决策程序中指示流程的代码都是 运行。因此,当决策者到达 activity 时,它将看到它有 运行 并且已抛出异常,并且它将通过设置标志的代码 + 发送电子邮件。
当指数重试开始时,将设置标志,但发送电子邮件的代码将在每个决策上 运行(决策者基本上是无状态的,状态是基于历史建立的).
在这种特殊情况下,您可以做的是将发送电子邮件的部分单独移动 activity。这样,决策者将在历史记录中看到它并快进,而不是每次都 运行ning 它。
为了只通知第一个抛出的异常,我做的是:
@Override
protected void doTry() throws Throwable {
asyncExecutor.execute(() -> new TryCatchFinally() {
Throwable throwable = null;
@Override
protected void doTry() throws Throwable {
Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
}
@Override
protected void doCatch(Throwable e) throws Throwable {
if (!notifyException.isReady()) {
activityOneClient.notify();
notifyException.set(true);
}
throwable = e;
}
@Override
protected void doFinally() throws Throwable {
if (throwable != null) {
throw throwable;
}
}
});
}