如何在轴突框架中处理从 saga 发送的命令

How to handle commands sent from saga in axon framework

使用 saga,给定一个事件 EventA,saga 开始,它发送一个命令(或多个)。 我们如何确保命令发送成功然后其他微服务中的实际逻辑没有抛出等

让我们举一个电子邮件传奇的例子: 当用户注册时,我们创建一个发布 UserRegisteredEvent 的用户聚合,将创建一个传奇,这个传奇负责确保向用户发送注册电子邮件(电子邮件可能包含验证密钥、欢迎消息等)。

我们应该使用:

  1. commandGateway.sendAndWait 和 try/catch -> 它能缩放吗?

  2. commandGateway.send 并使用截止日期并使用某种 "fail event",例如 SendEmailFailedEvent -> 需要为命令关联一个 "token",以便可以关联 "associationProperty" 具有正确的传奇 发送 SendRegistrationEmailCommand

  3. commandGateway.send(...).handle(...) -> 在句柄中我们可以引用 MyEmailSaga 中的 eventGateway/commandGateway 吗? 如果错误我们发送一个事件?或者我们可以 modify/call 来自我们拥有的 saga 实例的方法。如果没有错误,则其他服务已发送类似 "RegistrationEmailSentEvent" 的事件,因此 saga 将结束。

  4. 使用deadline,因为我们只是使用"send",不处理可能发送失败的命令的最终错误(其他服务宕机等)

  5. 还有别的吗?

  6. 还是全部的组合?

如何处理下面的错误? (使用截止日期或 .handle(...) 或其他)

错误可能是:

@Saga
public class MyEmailSaga {

    @Autowired
    transient CommandGateway commandGateway;


    @Autowired
    transient EventGateway eventGateway;

    @Autowired
    transient SomeService someService;

    String id;
    SomeData state;
    /** count retry times we send email so can apply logic on it */
    int sendRetryCount;

    @StartSaga
    @SagaEventHandler(associationProperty = "id")
    public void on(UserRegisteredEvent event) {
        id = event.getApplicationId();
        //state = event........
        // what are the possibilities here? 
        // Can we use sendAndWait but it does not scale very well, right?
        commandGateway.send(new SendRegistrationEmailCommand(...));
        // Is deadline good since we do not handle the "send" of the command
    }

    // Use a @DeadlineHandler to retry ?

    @DeadlineHandler(deadlineName = "retry_send_registration_email")
    fun on() {
         // resend command and re-schedule a deadline, etc
    }

    @EndSaga
    @SagaEventHandler(associationProperty = "id")
    public void on(RegistrationEmailSentEvent event) {

    }

}

编辑(接受答案后):

主要有两个选项(对不起,下面是kotlin代码):

第一个选项

commandGateway.send(SendRegistrationEmailCommand(...))
    .handle({ t, result ->
    if (t != null) {
       // send event (could be caught be the same saga eventually) or send command or both
    }else{
       // send event (could be caught be the same saga eventually) or send command or both
    }
    })
// If not use handle(...) then you can use thenApply as well
    .thenApply { eventGateway.publish(SomeSuccessfulEvent(...)) }
    .thenApply { commandGateway.send(SomeSuccessfulSendOnSuccessCommand) }

第二个选项: 如果 SendRegistrationEmailCommand 失败并且您没有收到任何失败事件(当您不处理发送的命令时),请使用截止日期确保 saga 执行某些操作。

当然可以将截止日期用于其他目的。

成功接收到 SendRegistrationEmailCommand 后,接收方将发布一个事件,以便 saga 收到通知并采取行动。 可以是 RegistrationEmailSentEvent 或 RegistrationEmailSendFailedEvent。

总结:

似乎最好只在命令发送失败或接收方抛出意外异常时才使用handle(),如果是这样,则发布一个事件供传奇采取行动。 如果成功,接收者应该发布事件,saga 将监听它(并最终注册一个截止日期以防万一); Receiver也可以发送事件通知错误不抛出,saga也会监听这个事件。

理想情况下,您将使用异步选项来处理错误。这将是 commandGateway.send(command)commandGateway.send(command).thenApply()。如果故障与业务逻辑相关,那么在这些故障上发出事件可能是有意义的。一个普通的 gateway.send(command) 就有意义了; Saga 可以对作为结果返回的事件做出反应。否则,您将不得不处理命令的结果。

是否需要使用 sendAndWait 或仅 send().then... 取决于失败时您需要执行的 activity。不幸的是,当异步处理结果时,您不能再安全地修改 Saga 的状态。 Axon 可能已经保留了 saga 的状态,导致这些更改丢失。 sendAndWait 解决了这个问题。可扩展性通常不是问题,因为可以并行执行不同的 Sagas,具体取决于您的处理器配置。

Axon 团队目前正在寻找可能的 API,以允许在 Sagas 中安全异步执行逻辑,同时仍然保证线程安全和状态持久性。