分离 Axon 命令及其效果

Separating Axon commands and its effects

我正在维护一个事件源应用程序,恐怕它偏离了道路。

在一种情况下,聚合根收到一个命令,该聚合根发布一个事件,该事件由需要做两件事的事件处理程序处理:

  1. 将命令 (cmd1) 发送到另一个聚合根,该聚合根将发布一个事件,该事件将创建多个传奇,每次触发一些最终由多个聚合处理的命令
  2. 发送第二个命令 (cmd2),这也会导致各种 command/event/command 序列。

示意图形式:

cmd0 -> AR0 -> evt0 -> evtHandler -> cmd1 -> AR1 -> evt1 -> saga stuff and more cmds and evts
                                 |-> cmd2 -> AR2 -> evt2 -> more saga stuff, cmds and evts

一切都发生在同一个线程中,一切都发生在从第一个命令处理开始的 1 个事务中。

现在 目标:所有事件、saga 的、源自第一个命令 (cmd1) 的聚合调用应该首先发生,然后是源自第二个命令的所有事件、saga 的聚合调用命令 (cmd2) 应该发生。

这是观察结果: cmd1 调用发布 evt1 的 AR1,但之后 cmd2 调用 AR2 发布 evt2。来自 cmd1 的所有其他事件和命令都与来自 cmd2 的事件和命令混合在一起。

首先我认为我可以使用 UnitOfWork 解决它,但即使明确创建一个单独的工作单元来处理 cmd1 也没有解决问题。查看 AbstractEventBus 中的实现,我发现事件只是简单地合并到父工作单元中,因此最终与源自 cmd2 的事件合并。

这里是问题:有没有一种方法可以先调用 cmd1 并等待所有源于该命令的效果在调用 cmd2 之前得到处理,同时仍然保留我目前的事务原子性有吗?

Jan,老实说,如果您的应用程序中的组件不太依赖该顺序,那将是最好的。 它本质上意味着你有不同的消息处理组件,它们本质上可能是不同的微服务,但它们都捆绑在一起,因为顺序很重要。

理想情况下,您应该将组件设置为独立工作。 因此,聚合处理命令并发布结果,完成。 Sagas 对事件做出反应,不管它们来自哪里,并通过动作(例如命令调度)对它们做出反应。

接受这种可能性会有所帮助,因为它将放弃等待一个过程完成的整个要求。

从理论上讲,这就是我的回答。

从一个更务实的角度来看你的问题,我想指出这听起来像是你要进入的一个兔子洞。您不仅希望完成 cmd1 处理,还希望解决 all sagas 上的事件处理,包括由此产生的命令,对吗?谁能告诉我 Sagas 的数量是多少?或者需要考虑那些 saga dispatch 的命令数量?这些标准可能会随着时间的推移而改变,添加更多需要“在单个交易中”发生的东西。

嗯,是的,有一些方法可以等待某些部分的处理,然后将它们全部拉到一个事务中。但老实说,我不建议走这条路,因为它只会使使用这种基于消息的系统变得越来越复杂。

关键是什么 所有效果是什么。从调度该命令的角度来看,您应该只关心该确切命令是否成功处理是或否,这就是问题应该结束的地方。

我知道这不会为您提供简单的程序化解决方案,因为您需要调整设计。但是我觉得解耦才是去听的唯一正确方式。

这是我对这种情况的两分钱,希望这能在 1 月以任何方式进一步帮助您


消息预期说明更新

本质上,您在 Axon 应用程序中使用的消息构成了一个边界。一个边界,在这个边界之后组件基本上不知道 什么 将处理这些消息。每条消息的行为略有不同,但可能会阐明您打开的内容:

  • 命令 - 命令始终路由到单个实例上的单个句柄。此外,您可以预期以 OK 或 NOK 形式出现的响应。 OK 的意思是处理程序是 void 或创建的实体的标识符(如聚合本身)。 NOK 通常是您从命令处理方法中抛出的异常,表示出现问题 命令根本无法执行,应该让调度端知道。
  • 事件 - 事件将被广播到 任何 组件,该组件已经订阅了 EventBus 能够处理给定的事件。请注意,事件处理在时间上与事件的实际发布点是分开的。这意味着事件处理的结果不可能(或应该)返回给事件的调度程序。
  • 查询 - 查询消息可以多种形式发送。单个组件最适合回答查询(称为点对点查询)。您还可以将查询分派给多个处理程序并聚合结果(称为 Scatter-Gather 查询)。最后,您可以通过执行“订阅查询”来订阅查询模型,这本质上是点对点的组合,然后是 Flux 更新。显然,查询分派意味着您正在从某个组件接收结果。只是您可以自由选择查询类型。如果需要任何关于查询响应的“最新”性的保证,应该是正在发送的查询的实现的一部分,以及如何通过 @QueryHandler 注释方法处理它。

希望这能让您更清楚地了解每条消息在 Axon 应用程序中的作用!