AWS 简单工作流通知技术

AWS Simple workflow notification technique

我是 AWS SWF 的新手,我的任务是仅在 activity 首次尝试失败时发出通知(通过电子邮件)。我使用 Settable<Boolean> 作为我的标志,但该值不可靠,因为工作流是 运行 异步的。这是我的代码:

 final AsyncExecutor asyncExecutor = new AsyncRetryingExecutor(retryPolicy, workflowClock);
 final Settable<Boolean> notifyException = new Settable<>();

    new TryCatch() {
        @Override
        protected void doTry() throws Throwable {
            asyncExecutor.execute(() -> new TryCatch() {
                @Override
                protected void doTry() throws Throwable {
                    Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
                }

                @Override
                protected void doCatch(Throwable e) throws Throwable {
                    if (!notifyException.isReady()) {
                        // PERFORM notification service here!!
                        notifyException.set(true);
                    } else {
                        // DO NOTHING. Notification is already sent the first time :)
                    }

                    throw e;
                }
            });
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            System.out.println("======");
            System.out.println("CATCH ALL!! " + e.getMessage());
            System.out.println("======");
        }
    };

notifyException 的值始终在变化,即使我在 if statement 中明确将其值设置为 true。我的代码的结果是它将执行超过 1 个 notification service.

====更新===

 final AsyncExecutor asyncExecutor = new AsyncRetryingExecutor(retryPolicy, workflowClock);
 final Settable<Boolean> notifyException = new Settable<>();

    new TryCatch() {
        @Override
        protected void doTry() throws Throwable {
            asyncExecutor.execute(() -> new TryCatch() {
                @Override
                protected void doTry() throws Throwable {
                    Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
                }

                @Override
                protected void doCatch(Throwable e) throws Throwable {
                    if (!notifyException.isReady()) {
                        activityOneClient.notify();
                        notifyException.set(true);
                    }
                    throw e;
                }
            });
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            System.out.println("======");
            System.out.println("CATCH ALL!! " + e.getMessage());
            System.out.println("======");
        }
    };

当我重新抛出异常时,activityOneClient.notify()只会在asyncExecutor的最后一次重试时执行,所以如果我不重新抛出,activityOneClient.notify()将是自动执行。如果第一次出现异常,我只需要通知即可。

代码看起来还不错(还没有 运行)。

你的问题在别的地方。流程框架的工作方式是,每当有工作要做时,工作流工作人员或 activity 工作人员接手任务,运行 并报告结果。报告结果意味着执行的结果被捕获在工作流历史中。

每当历史改变决策者(工作流)工作者 运行s 并决定接下来应该发生什么。在幕后,决策者只是重播所有历史并从一开始就做出所有决定。现在,如果决策与历史记录相匹配,决策者就会继续下去,直到它遇到新的东西。

简单的例子。假设您有一个包含 3 个步骤的工作流程:

  • activity1
  • activity2 - 取决于 activity1
  • 的结果
  • activity3 - 取决于 activity2
  • 的结果

[ 请记住,乘法活动可以同时 运行,如果 activity 的输出与另一个 activity 的输入之间没有 link流程框架将向前推进并并行安排多项工作。保持简单,这样你就明白了 ]

在工作流启动时的 T0 没有历史记录,决策程序 运行s 和 activity1 已安排。

在 T1,activity1 名工人拿起任务,执行并报告结果。

在 T2 作为 activity1 更新历史的结果,决策者被安排并且 运行s + 它读取历史。它看到 activity1 应该 运行,它在历史记录中看到它,它看到它完成,它安排 activity2

在 T3,activity2 名工人拿起任务,执行并报告结果。

在 T4 作为 activity2 更新历史的结果,决策者被安排并且 运行s + 它读取历史。它看到 activity1 应该 运行,它在历史记录中看到它,它看到它完成了。它现在看到 activity2 应该 运行,它在历史记录中看到它,它看到它已经完成并继续前进到 activity3。它安排 activity3.

等等。

你的问题是,每次决策 运行s(每个决策)时,决策程序中指示流程的代码都是 运行。因此,当决策者到达 activity 时,它将看到它有 运行 并且已抛出异常,并且它将通过设置标志的代码 + 发送电子邮件。

当指数重试开始时,将设置标志,但发送电子邮件的代码将在每个决策上 运行(决策者基本上是无状态的,状态是基于历史建立的).

在这种特殊情况下,您可以做的是将发送电子邮件的部分单独移动 activity。这样,决策者将在历史记录中看到它并快进,而不是每次都 运行ning 它。

为了只通知第一个抛出的异常,我做的是:

@Override
    protected void doTry() throws Throwable {
        asyncExecutor.execute(() -> new TryCatchFinally() {

            Throwable throwable = null;
            @Override
            protected void doTry() throws Throwable {
                Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
            }

            @Override
            protected void doCatch(Throwable e) throws Throwable {
                if (!notifyException.isReady()) {
                    activityOneClient.notify();
                    notifyException.set(true);
                }
                throwable = e;
            }

            @Override
            protected void doFinally() throws Throwable {
                if (throwable != null) {
                    throw throwable;
                }
            }
        });
    }