特定条件下的 AWS SWF 重启工作流
AWS SWF Restart workflow under specific conditions
我有一个 SWF 工作流程和 activity。以下是结构:
WorkflowClientImpl Class:
class TempWorkflowImpl() {
@Override
public void execute() {
new TryCatchFinallly {
@Override
protected void doTry() throws Throwable {
activityClient.invoke();
}
@Override
protected void doFinally() throws Throwable {
// Clean up code
}
@Override
protected void doCatch() throws Throwable {
// Handle Exception
}
}
}
}
ActivityClientImpl Class:
class TempActivityImpl() {
@Override
public void invoke() {
// Perform some logic
// Check if API call (API_Call_A) is made previously
// If not Invoke API_Call_A.
// If yes, throw exception
}
}
activity class 调用了一个异步方法 API。 API 调用中定义的操作大约需要一个小时才能完成。有时,由于某些原因,操作可能会在执行时失败。此 API 调用是在我无权访问的服务上定义的。
有没有办法让 activity 休眠,以便它可以在一小时后检查操作是否成功。如果不成功,我将重新调用 API 调用。让我们假设这次操作会成功,并且我们不会陷入 API 次调用尝试的无限循环。
Thread.sleep()
似乎是一种方式,但我不确定这是最合适的方式。我还发现我们可以使用
重新启动整个工作流程
Promise<Void> timer = decisionContextProvider.getDecisionContext().getWorkflowClock().createTimer(TimeUnit.MINUTES.toSeconds(TimeinMinutes));
continueAsNew(timer);
要使用上面的方法,我可以在 API 调用后从 activity 方法 return 获取 TimeinMinutes
的值,然后在一个小时后重新启动工作流。
以上的做法是不是最合适?或者有更好的方法吗?
谢谢
无需调用 continueAsNew 除非您的工作流历史记录很大(例如在调用 activity 100 次之后)。只需使用 @Asynchronous 方法或任务来等待承诺。我会将您的工作流建模为两个活动:调用和检查结果并在延迟后执行检查结果并使用@ExponentialResult 重试直到结果可用。
class TempWorkflowImpl() {
private final WorkflowClock clock = decisionContextProvider.getDecisionContext().getWorkflowClock()
@Override
public void execute() {
new TryCatchFinallly {
@Override
protected void doTry() throws Throwable {
invoke();
}
@Override
protected void doFinally() throws Throwable {
// Clean up code
}
@Override
protected void doCatch() throws Throwable {
// Handle Exception
}
}
}
@Asynchronous
// On ServiceFailureException retry from the beginning
@ExponentialRetry(initialRetryIntervalSeconds=300, exceptionsToRetry=ServiceFailureException.class)
private Promise<ResultType> invoke() {
Promise<Void> invoked = activityClient.invoke();
Promise<ResultType> result = checkResultAfterDelay(invoked);
processResult(result);
}
@Asynchronous
private Promise<ResultType> checkResultAfterDelay(Promise<Void> invoked) {
Promise<Void> timer = clock.createTimer(TimeUnit.MINUTES.toSeconds(60));
return checkResult(timer);
}
@Asynchronous
// Automatically retry on ResultUnavailableException
@ExponentialRetry(initialRetryIntervalSeconds=300, exceptionsToRetry=ResultUnavailableException.class)
private Promise<ResultType> checkResult(Promise<Void> timer) {
return activityClient.checkResult();
}
@Asynchronous
private processResult(Promise<ResultType> result) {
....
}
}
我有一个 SWF 工作流程和 activity。以下是结构:
WorkflowClientImpl Class:
class TempWorkflowImpl() {
@Override
public void execute() {
new TryCatchFinallly {
@Override
protected void doTry() throws Throwable {
activityClient.invoke();
}
@Override
protected void doFinally() throws Throwable {
// Clean up code
}
@Override
protected void doCatch() throws Throwable {
// Handle Exception
}
}
}
}
ActivityClientImpl Class:
class TempActivityImpl() {
@Override
public void invoke() {
// Perform some logic
// Check if API call (API_Call_A) is made previously
// If not Invoke API_Call_A.
// If yes, throw exception
}
}
activity class 调用了一个异步方法 API。 API 调用中定义的操作大约需要一个小时才能完成。有时,由于某些原因,操作可能会在执行时失败。此 API 调用是在我无权访问的服务上定义的。 有没有办法让 activity 休眠,以便它可以在一小时后检查操作是否成功。如果不成功,我将重新调用 API 调用。让我们假设这次操作会成功,并且我们不会陷入 API 次调用尝试的无限循环。
Thread.sleep()
似乎是一种方式,但我不确定这是最合适的方式。我还发现我们可以使用
Promise<Void> timer = decisionContextProvider.getDecisionContext().getWorkflowClock().createTimer(TimeUnit.MINUTES.toSeconds(TimeinMinutes));
continueAsNew(timer);
要使用上面的方法,我可以在 API 调用后从 activity 方法 return 获取 TimeinMinutes
的值,然后在一个小时后重新启动工作流。
以上的做法是不是最合适?或者有更好的方法吗?
谢谢
无需调用 continueAsNew 除非您的工作流历史记录很大(例如在调用 activity 100 次之后)。只需使用 @Asynchronous 方法或任务来等待承诺。我会将您的工作流建模为两个活动:调用和检查结果并在延迟后执行检查结果并使用@ExponentialResult 重试直到结果可用。
class TempWorkflowImpl() {
private final WorkflowClock clock = decisionContextProvider.getDecisionContext().getWorkflowClock()
@Override
public void execute() {
new TryCatchFinallly {
@Override
protected void doTry() throws Throwable {
invoke();
}
@Override
protected void doFinally() throws Throwable {
// Clean up code
}
@Override
protected void doCatch() throws Throwable {
// Handle Exception
}
}
}
@Asynchronous
// On ServiceFailureException retry from the beginning
@ExponentialRetry(initialRetryIntervalSeconds=300, exceptionsToRetry=ServiceFailureException.class)
private Promise<ResultType> invoke() {
Promise<Void> invoked = activityClient.invoke();
Promise<ResultType> result = checkResultAfterDelay(invoked);
processResult(result);
}
@Asynchronous
private Promise<ResultType> checkResultAfterDelay(Promise<Void> invoked) {
Promise<Void> timer = clock.createTimer(TimeUnit.MINUTES.toSeconds(60));
return checkResult(timer);
}
@Asynchronous
// Automatically retry on ResultUnavailableException
@ExponentialRetry(initialRetryIntervalSeconds=300, exceptionsToRetry=ResultUnavailableException.class)
private Promise<ResultType> checkResult(Promise<Void> timer) {
return activityClient.checkResult();
}
@Asynchronous
private processResult(Promise<ResultType> result) {
....
}
}