事件采购聚合中的版本号?

Version number in event sourcing aggregate?

我正在构建微服务。我的微服务之一是使用 CQRS 和事件源。系统中引发了集成事件,我将聚合保存在事件存储中,同时更新了我的读取模型。

我的问题是,为什么我们在根据聚合更新事件流时需要聚合版本?我读到我们需要这个来保持一致性,并且事件要按顺序重播,我们需要在保存之前检查版本(https://blog.leifbattermann.de/2017/04/21/12-things-you-should-know-about-event-sourcing/)我仍然无法理解这个问题,因为事件是按顺序引发和保存的,所以我真的需要具体的例子来理解我们从版本中得到什么好处以及我们为什么需要它们。

非常感谢,

伊姆兰

是的,这是对的。您需要版本号或序列号以保持一致性。

你想要两件事:

  1. 顺序正确
    通常事件在本质上是幂等的,因为在分布式系统中幂等消息或事件更容易处理。幂等消息是指即使多次应用也会产生相同结果的消息。用固定值(比如一个)更新寄存器是幂等的,但将计数器递增 1 则不是。在分布式系统中,当 A 向 B 发送消息时,B 会确认 A。但是如果 B 使用了该消息并且由于某些网络错误而丢失了对 A 的确认,则 A 不知道 B 是否收到了消息,因此它会发送消息再次。现在 B 再次应用消息,如果消息不是幂等的,最终状态就会出错。所以,你想要幂等消息。但是,如果您未能按照生成它们的相同顺序应用这些幂等消息,您的状态将再次出错。可以使用版本 ID 或序列来实现此排序。如果您的事件存储是 RDBMS,则如果没有任何类似的排序键,您将无法对事件进行排序。在 Kafka 中,您也有偏移量 ID,客户端会跟踪它已消耗的偏移量

  2. 去重
    其次,如果您的消息不是幂等的怎么办?或者,如果您的消息是幂等的,但消费者以不确定的方式调用某些外部服务怎么办。在这种情况下,您需要一个恰好一次的语义,因为如果您两次应用相同的消息,您的状态将是错误的。此处还需要版本 ID 或序列号。如果在消费者端,您跟踪已经处理过的版本 ID,则可以根据 ID 进行重复数据删除。在 Kafka 中,您可能希望将偏移量 id 存储在消费者端

根据评论进一步说明:

相关文章的作者将 RDBMS 假定为事件存储。版本 ID 或事件序列应由生产者生成。因此,在您的示例中,"delivered" 事件的顺序将高于 "in transit" 事件。

当您想并行处理事件时会出现问题。如果一个消费者获得 "delivered" 事件而另一个消费者获得 "in transit" 事件怎么办?显然,您必须确保特定订单的所有事件都由同一消费者处理。在 Kafka 中,您可以通过选择订单 ID 作为分区键来解决这个问题。由于一个分区将仅由一个消费者处理,你知道你总是会在交付前得到 "in transit"。但是多个订单将分布在同一消费者组内的不同消费者中,因此您进行并行处理。

关于aggregate id,我认为这是Kafka中topic的同义词。由于作者采用 RDBMS 存储,因此他需要一些标识符来分隔不同类别的消息。您可以通过在 Kafka 中创建单独的主题以及每个聚合的消费者组来做到这一点。

让我描述一个聚合版本有用的案例:

在我们的reSove framework聚合版本中用于乐观并发控制。

我会举例说明。假设 InventoryItem 聚合接受命令 AddItemsOrderItemsAddItems 增加库存商品数量,OrderItems - 减少。 假设您有一个 InventoryItem 聚合 #123,其中有一个事件 - ITEMS_ADDED,数量为 5。聚合 #123 状态说有 5 件商品。

所以您的 UI 向用户显示有 5 件商品有货。用户 A 决定订购 3 件商品,用户 B - 4 件。两者几乎同时发出 OrderItems 命令,假设用户 A 是第一个几毫秒。

现在,如果您在内存中有一个聚合 #123 实例,在单线程中,您没有问题 - 来自用户 A 的第一个命令会成功,事件会被应用,状态说数量是2,因此来自用户 B 的第二个命令将失败。

在来自 A 和 B 的命令在不同进程中的分布式或无服务器系统中,如果我们不使用某些并发控制,两个命令都会成功并使聚合进入不正确的状态。有几种方法可以做到这一点 - 悲观锁定、命令队列、聚合存储库或乐观锁定。

乐观锁似乎是最简单实用的解决方案:

我们说每个聚合都有一个版本 - 其流中的事件数。所以我们的聚合 #123 有版本 1.

当聚合发出一个事件时,这个事件数据有一个聚合版本。在我们的例子中,来自用户 A 和 B 的 ITEMS_ORDERED 事件的事件聚合版本为 2。显然,聚合事件的版本应该按顺序增加。所以我们需要做的只是设置一个数据库约束,即元组 {aggregateId, aggregateVersion} 在写入事件存储时应该是唯一的。

让我们看看我们的示例如何在具有乐观并发控制的分布式系统中运行:

  • 用户 A 发出命令 OrderItem 聚合 #123

  • 聚集 #123 从事件中恢复 {version 1, quantity 5}

  • 用户 B 发出命令 OrderItem 聚合 #123

  • 从事件(版本 1,数量 5)恢复聚合 #123 的另一个实例

  • 用户A的聚合实例执行命令,成功,事件ITEMS_ORDERED {aggregateId 123, version 2}被写入事件存储。

  • 用户 B 的聚合实例执行命令,它成功,事件 ITEMS_ORDERED {aggregateId 123, version 2} 它尝试将其写入事件存储并因并发异常而失败。

  • 在用户 B 的此类异常命令处理程序中,只需重复整个过程 - 然后聚合 #123 将处于 {version 2, quantity 2} 状态,命令将正确执行。

我希望这清除了聚合版本有用的情况。