如何防止分布式 CQRS/ES 系统中的视图更新丢失?
How to prevent lost updates on the views in a distributed CQRS/ES system?
我有一个 CQRS/ES 应用程序,其中一些视图由来自多个聚合根的事件填充。
我在 CashRegister
聚合根上有一个 CashRegisterActivated
事件,在 Sale
聚合根上有一个 SaleCompleted
事件。这两个事件都用于填充 CashRegisterView
。 CashRegisterActivated
事件创建 CashRegisterView
或将其设置为活动状态以防它已存在。 SaleCompleted
事件设置最后一次销售序列号并更新抽屉中的现金。
当其中两个事件在毫秒内到达时,第一个更新将被最后一个更新覆盖。所以这是一个丢失的更新。
我已经想到了一些可能的解决方案,但它们都有各自的缺点:
- 在同一线程上编组一个视图或一个视图的一个记录的所有事件处理。这在单个节点上运行良好,但一旦你向外扩展,事情就开始变得复杂起来。您需要确保视图的所有事件都传递到同一节点。当它宕机时,您需要迁移到另一个节点。这需要一些了解事件和视图的智能负载平衡器。
- 更新前锁定记录以确保没有其他线程或节点同时修改它。这可能会很好地工作,但这意味着放弃无锁系统。线程将设置在那里,等待释放锁。锁定还意味着当我扩展数据存储时延迟增加(如果我没记错的话)。
郑重声明:我将 Java 与 Apache Camel、RabbitMQ 一起使用来传送事件,并使用 MariaDB 来存储视图数据。
I have a CQRS/ES application where some of the views in the read model are populated by events from multiple aggregate roots.
这可能是一个错误。
从一个孤立的事件中驱动一个进程。但是组成一个视图通常需要一个历史,而不是一个单一的事件。
更可能的实现是使用事件的到来来标记当前视图过时,并使用单个编写器根据相关聚合生成的事件历史记录更新视图。
And that requires a smart messaging solution. I thought "Smart endpoints and dumb pipes" would be a good practice for CQRS/ES systems.
是的。端点只需要足够聪明以了解何时需要历史记录或何时事件就足够了。
视图毕竟只是一个快照。您获取输入(X.history、Y.history),生成快照,将快照写入您的视图存储(可能使用描述所使用历史位置的元数据),然后您就完成了。
事件 仅用于向作者表明先前的快照已过时。您不使用事件来扩展历史记录,而是使用事件告诉作者历史记录已更改。
您不会丢失 多个事件的更新,因为事件本身及其所有状态都被记录在历史记录中。这是用于构建事件源视图的历史记录。
康拉德·加鲁斯 wrote
... handling events coming from a single source is easier, but more importantly because a DB-backed event store trivially guarantees ordering and has no issues with lost or duplicate messages.
尽管我看到您询问如何大规模处理这些事情 - 我看到人们建议使用单线程方法 - 直到它真正成为一个问题 - 然后解决它。
我会给每个视图模型一个流程管理器,从商店中提取您需要的事件并将它们写入单线程。
解决方案可能是检测这种情况何时发生,然后重试。
为此:
- 向每个 table 添加保持最新的聚合版本号
- 在每个更新语句中添加以下 where 子句 "aggr_version=n-1"(其中 n 是正在处理的事件的版本)
- 当update语句的结果是没有被修改的记录时,可能意味着事件处理顺序不对,可以执行重试策略
问题是这增加了复杂性并且难以测试。性能瓶颈很可能在数据库中,因此带有故障转移解决方案的单个进程可能是最简单的解决方案。
我将 VoiceOfUnreason 和 StefRave 的答案组合成我认为可能有用的东西。从多个聚合根填充视图确实感觉不对。我们有重试队列的乱序检测。因此聚合根上的事件只有在最后一个完全处理的事件是版本 n-1 时才会被处理。
因此,当我为将由多个聚合根(比如聚合视图)填充的视图创建新的聚合根时,视图的所有更新都将在没有行锁定或线程同步的情况下同步。我们还在聚合根上使用重试机制进行冲突检测,这将处理命令端的并发性。因此,如果我只是从我当前用于填充聚合视图的事件构建这些聚合根,我将解决丢失更新的问题。
关于这个解决方案的想法?
我有一个 CQRS/ES 应用程序,其中一些视图由来自多个聚合根的事件填充。
我在 CashRegister
聚合根上有一个 CashRegisterActivated
事件,在 Sale
聚合根上有一个 SaleCompleted
事件。这两个事件都用于填充 CashRegisterView
。 CashRegisterActivated
事件创建 CashRegisterView
或将其设置为活动状态以防它已存在。 SaleCompleted
事件设置最后一次销售序列号并更新抽屉中的现金。
当其中两个事件在毫秒内到达时,第一个更新将被最后一个更新覆盖。所以这是一个丢失的更新。
我已经想到了一些可能的解决方案,但它们都有各自的缺点:
- 在同一线程上编组一个视图或一个视图的一个记录的所有事件处理。这在单个节点上运行良好,但一旦你向外扩展,事情就开始变得复杂起来。您需要确保视图的所有事件都传递到同一节点。当它宕机时,您需要迁移到另一个节点。这需要一些了解事件和视图的智能负载平衡器。
- 更新前锁定记录以确保没有其他线程或节点同时修改它。这可能会很好地工作,但这意味着放弃无锁系统。线程将设置在那里,等待释放锁。锁定还意味着当我扩展数据存储时延迟增加(如果我没记错的话)。
郑重声明:我将 Java 与 Apache Camel、RabbitMQ 一起使用来传送事件,并使用 MariaDB 来存储视图数据。
I have a CQRS/ES application where some of the views in the read model are populated by events from multiple aggregate roots.
这可能是一个错误。
从一个孤立的事件中驱动一个进程。但是组成一个视图通常需要一个历史,而不是一个单一的事件。
更可能的实现是使用事件的到来来标记当前视图过时,并使用单个编写器根据相关聚合生成的事件历史记录更新视图。
And that requires a smart messaging solution. I thought "Smart endpoints and dumb pipes" would be a good practice for CQRS/ES systems.
是的。端点只需要足够聪明以了解何时需要历史记录或何时事件就足够了。
视图毕竟只是一个快照。您获取输入(X.history、Y.history),生成快照,将快照写入您的视图存储(可能使用描述所使用历史位置的元数据),然后您就完成了。
事件 仅用于向作者表明先前的快照已过时。您不使用事件来扩展历史记录,而是使用事件告诉作者历史记录已更改。
您不会丢失 多个事件的更新,因为事件本身及其所有状态都被记录在历史记录中。这是用于构建事件源视图的历史记录。
康拉德·加鲁斯 wrote
... handling events coming from a single source is easier, but more importantly because a DB-backed event store trivially guarantees ordering and has no issues with lost or duplicate messages.
尽管我看到您询问如何大规模处理这些事情 - 我看到人们建议使用单线程方法 - 直到它真正成为一个问题 - 然后解决它。 我会给每个视图模型一个流程管理器,从商店中提取您需要的事件并将它们写入单线程。
解决方案可能是检测这种情况何时发生,然后重试。 为此:
- 向每个 table 添加保持最新的聚合版本号
- 在每个更新语句中添加以下 where 子句 "aggr_version=n-1"(其中 n 是正在处理的事件的版本)
- 当update语句的结果是没有被修改的记录时,可能意味着事件处理顺序不对,可以执行重试策略
问题是这增加了复杂性并且难以测试。性能瓶颈很可能在数据库中,因此带有故障转移解决方案的单个进程可能是最简单的解决方案。
我将 VoiceOfUnreason 和 StefRave 的答案组合成我认为可能有用的东西。从多个聚合根填充视图确实感觉不对。我们有重试队列的乱序检测。因此聚合根上的事件只有在最后一个完全处理的事件是版本 n-1 时才会被处理。
因此,当我为将由多个聚合根(比如聚合视图)填充的视图创建新的聚合根时,视图的所有更新都将在没有行锁定或线程同步的情况下同步。我们还在聚合根上使用重试机制进行冲突检测,这将处理命令端的并发性。因此,如果我只是从我当前用于填充聚合视图的事件构建这些聚合根,我将解决丢失更新的问题。
关于这个解决方案的想法?