CQRS、事件溯源和扩展

CQRS, Event Sourcing and Scaling

很明显,基于这些模式的系统很容易扩展。但我想问你,具体如何?我对可扩展性有几个疑问:

  1. 如何缩放聚合?如果我将创建 aggregate A 的多个实例,如何同步它们?如果其中一个实例处理命令并创建事件,则该事件应该传播到该聚合的每个实例?
  2. 不应该存在一些业务逻辑来请求聚合的哪个实例?因此,如果我发出适用于 aggregate A (ORDERS) 并适用于一个特定订单的多个命令,则将其传送到同一实例是有意义的。或者?

本文中:https://initiate.andela.com/event-sourcing-and-cqrs-a-look-at-kafka-e0c1b90d17d8, 他们正在使用带有分区的 Kafka。因此,用户管理服务 - 聚合已缩放,但仅订阅主题的特定分区,其中包含特定用户的所有事件。

谢谢!

How to scale aggregates?

  • 谨慎选择聚合,确保你的命令在众多聚合中合理分布。您不希望聚合可能从并发用户接收大量命令。

  • 发送到聚合实例的序列化命令。这可以通过聚合存储库和命令 bus/queue 来完成。但对我来说,最简单的方法是按照 this post by Michiel Rook

  • 中所述使用聚合版本控制进行乐观锁定

which instance of the agregate to request?

在我们的 reSolve framework 中,我们在每个命令上创建聚合实例并且不在请求之间保留它。这工作起来出奇地快——获取 100 个事件并将它们缩减为聚合状态比在集群中找到正确的聚合实例要快。

这种方法是可扩展的,可以让您实现无服务器 - 每个命令调用一次 lambda,中间没有共享状态。聚合事件过多的罕见情况可通过快照解决。

How to scale aggregates?

系统中的每条信息都具有单一的逻辑权限。单个数据的多个权限会让您争执不休。您可以通过创建 更小 非重叠边界来扩展写入——每个机构的责任范围更小

To borrow from your example, an example of smaller responsibilities would
be to shift from one aggregate for all ORDERS to one aggregate for _each_
ORDER.

It's analogous to the difference between having a key value store with
all ORDERS stored in a document under one key, vs each ORDER being stored
using its own key.

读取是安全的,您可以将它们扩展为多个副本。然而,这些副本只是最终一致的。这意味着如果你问 "what is the bid price of FCOJ now?" 你可能会从每个副本中得到不同的答案。或者,如果你问 "what was the bid price of FCOJ at 10:09:02?" 那么每个副本都会给你一个单一的答案或者说 "I don't know yet".

But if the granularity is already one command per aggregate, what is not very often possible in my opinion, and you have really many concurrent accesses, how to solve it? How to spread the load and stay without the conflict as much as possible?

粗略的草图 - 它通过可以根据命令消息的内容计算的密钥存储的每个聚合。聚合的更新是通过使用该键的比较和交换操作实现的。

Acquire a message
Compute the storage key
Load a versioned representation from storage
Compute a new versioned representation
Store.compare and swap the new representation for the old

要提供额外的流量吞吐量,您需要添加更多的无状态计算。

为了提供存储吞吐量,您将密钥分布在更多存储设备上。

路由层可用于将消息分组在一起 - 路由器使用与以前相同的存储密钥计算,但使用它来选择计算场中的何处转发消息。然后计算可以检查它收到的每批消息是否有重复键,并一起处理这些消息(交易一些额外的计算以减少比较和交换的次数)。

健全的消息协议很重要;参见 Marc de Graauw 的 Nobody Needs Reliable Messaging.

How to scale aggregates?

聚合实例由它们的事件流表示。每个聚合实例都有自己的事件流。来自一个聚合实例的事件不被其他聚合实例使用。例如,如果 ID=1 的 Order Aggregate 创建一个 ID=1001 的 OrderWasCreated 事件,则该事件将永远不会用于再水化其他 Order Aggregate 实例(ID=2,3,4...)。

也就是说,您可以根据聚合 ID 在事件存储上创建分片来横向扩展聚合。

If I will create multiple instances of aggregate A, how to sync them? If one of the instances process the command and create an event, this event should be propagated to every instance of that agregate?

你不知道。每个聚合实例都与其他实例完全分开。

为了能够横向扩展命令处理,建议每次从事件存储中加载一个聚合实例,方法是重播其先前生成的所有事件。您可以执行一项优化来提高性能:聚合快照,但建议仅在确实需要时才执行此操作。 This 回答可能有所帮助。

Shouldn't be there some business logic present which instance of the agregate to request? So if I am issuing multiple commands which applies to aggregate A (ORDERS) and applies to one specific order, it make sense to deliver it to the same instance. Or?

您假设聚合实例在某些服务器的 RAM 上连续 运行。你可以这样做,但这样的架构非常复杂。例如,当其中一台服务器出现故障并且必须由其他服务器替换时会发生什么?很难确定哪些实例住在那里并重新启动它们。相反,您可以拥有许多 stateless 服务器来处理任何聚合实例的命令。当命令到达时,您识别聚合 ID,通过重播所有先前的事件从事件存储中加载它,然后它可以执行该命令。在执行命令并将新事件持久化到 Event store 后,您可以丢弃 Aggregate 实例。到达同一聚合实例的下一个命令可以由任何其他 stateless 服务器处理。因此,可伸缩性仅由事件存储本身的可伸缩性决定。