Axon Framework:Saga 项目,在两个或三个微服务之间进行补偿事件

Axon Framework: Saga project with compensation events between two or three microservices

我有一个关于 Axon Saga 的问题。我有一个项目,其中有三个微服务,每个微服务都有自己的数据库,但是两个 "Slave" 微服务必须将他的数据共享给 "Master" 微服务,为此我想使用 Axon Saga。我已经问过关于赔偿的问题,出了问题,我自己处理赔偿,可以,但不理想。目前我正在使用 DistributedCommandBus 在微服务之间进行通信,这样做有好处吗?我使用的是 Choreography Saga 模型,现在是这样的:

  1. Master -> 发送命令 -> Slave1 -> 处理事件
  2. Slave1 -> 发回命令 -> Master -> 处理事件
  3. Master -> 发送命令 -> Slave2 -> 处理事件
  4. Slave2 -> 发回命令 -> Master -> 处理事件

如果出现问题,补偿 Commands/Events 就会倒退。

我的问题是,有没有人用 Axon 做过这样的事情,有补偿,最佳做法是什么?如何重试 Saga 进程?使用 RetryScheduler?如果可以的话,添加一个 github 仓库。

谢谢,马特

我不知道你的确切用例,但从这个问题和你之前的问题中我得到你想要回滚的印象,或者在这种情况下撤消事件,如果事件处理程序之一无法处理它。

总的来说,您可以做一些事情。您可以查看首先应用该事件的聚合是否有或可以有信息来检查 'slave' 微服务是否应该能够在您应用它之前处理该事件。如果这不可行,从属微服务还可以直接在事件总线上应用 'failure' 事件,以通知系统的其余部分发生了需要处理的故障状态:

https://docs.axoniq.io/reference-guide/implementing-domain-logic/event-handling/dispatching-events#dispatching-events-from-a-non-aggregate

首先,让我回答你的主要问题:

My question is has anybody did something like this with Axon?

很快,是的,因为这是 Sagas 的主要用例之一。 根据经验,我想说明 Saga 可用于协调复杂的业务交易

  1. 几个不同的聚合实例
  2. 几个限界上下文

从表面上看,您似乎已经选择了委托复杂业务交易的选项二。

需要注意的是,当你在使用 Sagas 时,你应该非常有意识地处理任何异常and/or命令调度结果。

因此,如果您从“Master”向“Slave 1”发送命令,而后者操作失败,则此结果将返回到 Saga。 因此,这为您提供了重试操作的第一个选项,我建议使用 补偿操作 来完成。 最后,通过补偿操作,我正在谈论调度命令来触发它。

如果您不能依赖派发命令的直接响应,retrying/rescheduling在 Saga 中发送消息是第二个合理的选择。

为此,Axon 拥有 EventSchedulerDeadlineManager。 注意,两者前者发布一个事件给大家看看。 后者在单个 Saga 实例的上下文中安排了一个 DeadlineMessage,从而限制了谁可以看到正在发生重试的范围。

通常情况下,DeadlineManager 将是我的首选操作模式,除非您要求每个人都能看到此 'rescheduling action'。 仅供参考,检查 this page for EventScheduler information and this 页面以获取 DeadlineManager 信息。

样本更新

这里有一些伪代码,可以让您了解 Saga 事件处理程序中的补偿操作是什么样的:

class SomeSaga {

    private CommandGateway commandGateway;

    @SagaEventHandler(assocationValue = "some-key")
    public void on(SomeEvent event) {
        // perform some checks, validation and state setting, if necessary
        commandGateway.send(new MyActionCommand(...))
                      .exceptionally(throwable -> {
                                         commandGateway.send(new CompensatingAction(...));
                                     });
    }
}