CQRS 读取端、多个事件流主题、并发/竞争条件
CQRS read side, multiple event stream topics, concurrency / race conditions
我在读取/查询端以正确的顺序(重新)应用来自多个主题的事件时遇到问题。
示例:
在写入/命令端,我们有 2 个具有 n:m 关系的聚合:
- 联系方式
- 群组
这些聚合在 2 个单独的事件流主题 上产生以下事件(因为最佳实践说:每个聚合一个主题。我完全同意):
联系主题:
ContactCreated (contactId: "123", name: "Peter")
ContactAddedToGroup (contactId: "123", groupId: "456")
群主题:
GroupCreated (groupId: "456", name: "Customers")
在读取/查询端(例如 Elasticsearch),我想执行此查询:
- 查找属于名称以
Custo...
开头的任何组的所有联系人
- 查找名称以
Custo...
开头的所有组(这应该不是问题)
为了实现这一点,有 2 个读取模型。示例数据:
{contactId: "123", name: "Peter", groups: [{id: "456", name: "Customers"}]}
{groupId: "456", name: "Customers"}
问题:
只能针对单个事件主题保证事件的顺序(就像在 Apache Kafka 中一样)。尽管读取/查询端可以通过多种方式使用这 3 个事件:1,2,3
或 1,3,2
或 3,1,2
如何处理1,2,3
?数据库伪语句示例:
INSERT Contact (contactId: "123", name: "Peter")
FIND Group WHERE (groupId: "456")
(不起作用,因为尚未插入组)
UPDATE Contact WHERE (contactId: "123") ADD Group (groupId: "456", name: "???")
(问题来了)
INSERT Group (groupId: "456", name: "Customers")
想法:
我可以扩展算法并再附加一个语句。这将查找已添加到群组的所有联系人,并将群组名称添加到其中(以使搜索查询有效):
UPDATE Contact WHERE (groupId: "456") REPLACE Group (groupId: "456", name: "Customers")
另一个想法(我不喜欢)可能是只使用一个事件流主题。那么事件的顺序将永远是正确的。 但是在某些情况下这并不容易实现。 (最佳实践还表明,每个聚合应该使用一个主题)
忽略这个问题,因为它不太可能发生,因为用户会在创建组和添加联系人之间提供必要的延迟到组。 但是事件回放没有延迟,事件主题可以并行/'random'顺序消费。
问题:
这种情况应该很常见。但不幸的是,网络上很少有现实世界 CQRS 示例。而且大多数都不会解释小的/隐藏的陷阱。
你是如何解决这些问题的?
How do you solve those problems?
补救措施是避免尝试从事件历史的不稳定表示中重建图像。当您将状态加载到 write 模型中时,您通常会通过查询一个 "document" 来完成此操作,该 "document" 具有按写入顺序排列的所有聚合历史记录。
在读取模型中采用相同的方法,即读取每个主题的稳定事件历史记录,避免了您可能面临的问题,因为主题事件无序到达。
参见 Greg Young 在 polyglot data 上的演讲。
从多个主题构建阅读模型时,您可以采用相同的方法,这将为您提供每个主题的一致历史……但不一定是同步的整体。
因此,要使用您的具体示例,您可能有 ContactCreated (contactId: "123", name: "Peter")
ContactAddedToGroup (contactId: "123", groupId: "456")
,但没有属于 "middle" 的事件。那么现在呢?
一个可能的答案是使用未对齐的历史构建视图 - 您拥有 00:15 的联系信息和 00:00 的组信息,并且您将时间差异作为读模型。这可能包括使用 NullObject
模式的变体来表示尚不存在的对象。
另一种可能性是使用类似 Lamport Clock 的东西来跟踪不同主题中事件之间的依赖关系。这可能看起来像 ContactAddedToGroup
中的元数据,让消费者知道该事件是 GroupCreated
的结果。然后,消费者可以决定是否忽略缺少先例的事件。
在您的示例中,您可以保证在 ContactAddedToGroup (2) 之前添加了 GroupCreated 事件 (3),因为显然用户无法在创建组之前将联系人添加到组中。因此 GroupCreated 事件将可供读取,即使您碰巧先读取了 ContactAddedToGroup 事件。
坚持使用 2 个独立的流(这绝对是正确的,因为群组和联系人是独立的聚合),这是一种方法:
- 联系人可以维护自己的 table 组名称(您的示例中只需要 id 和 name 列)。或者,如果您乐于将群组和联系人结合起来(它们听起来像是同一个限界上下文),您可以只让单个事件处理程序同时处理群组和联系人投影。
- 投影处理程序订阅组和联系人事件(来自单个进程和线程)。
- 如果它读取了群组名称中没有的群组的联系人添加消息 table,它会立即对群组事件执行追赶(或至少追赶得足够远)它确实得到了那个组)然后再次处理联系事件。
此方法在回放和实时处理期间都有效。在回放期间,您还可以选择在开始使用投影的主流(在本例中为 Contact)之前完全使用父流(例如 Group),但您仍然需要准备好在必要时再次赶上 Group 流,因为在追赶期间可能会出现新事件。
如果您有 GroupRenamed 事件,单线程还可以确保没有竞争条件 - 您可以确定您将重命名所有联系人中的列,而对于多线程,您可能会在插入联系人与旧联系人之间发生竞争组名称和更新使用该组的联系人中的所有组名称的查询。如果你需要疯狂的规模,你将不得不分片你的联系人,并让每个分片维护自己的组名 table,以避免竞争条件。
另一种方法是决定组名允许为空,并且只在您阅读事件时更新联系人(您的第一个想法)。因此,您将以几乎相同的方式处理新组和组重命名(如果允许),但您的客户将需要处理联系人中临时为空的组名,这可能是一个不受欢迎的并发症。
我在读取/查询端以正确的顺序(重新)应用来自多个主题的事件时遇到问题。
示例:
在写入/命令端,我们有 2 个具有 n:m 关系的聚合:
- 联系方式
- 群组
这些聚合在 2 个单独的事件流主题 上产生以下事件(因为最佳实践说:每个聚合一个主题。我完全同意):
联系主题:
ContactCreated (contactId: "123", name: "Peter")
ContactAddedToGroup (contactId: "123", groupId: "456")
群主题:
GroupCreated (groupId: "456", name: "Customers")
在读取/查询端(例如 Elasticsearch),我想执行此查询:
- 查找属于名称以
Custo...
开头的任何组的所有联系人
- 查找名称以
Custo...
开头的所有组(这应该不是问题)
为了实现这一点,有 2 个读取模型。示例数据:
{contactId: "123", name: "Peter", groups: [{id: "456", name: "Customers"}]}
{groupId: "456", name: "Customers"}
问题:
只能针对单个事件主题保证事件的顺序(就像在 Apache Kafka 中一样)。尽管读取/查询端可以通过多种方式使用这 3 个事件:1,2,3
或 1,3,2
或 3,1,2
如何处理1,2,3
?数据库伪语句示例:
INSERT Contact (contactId: "123", name: "Peter")
FIND Group WHERE (groupId: "456")
(不起作用,因为尚未插入组)UPDATE Contact WHERE (contactId: "123") ADD Group (groupId: "456", name: "???")
(问题来了)
INSERT Group (groupId: "456", name: "Customers")
想法:
我可以扩展算法并再附加一个语句。这将查找已添加到群组的所有联系人,并将群组名称添加到其中(以使搜索查询有效):
UPDATE Contact WHERE (groupId: "456") REPLACE Group (groupId: "456", name: "Customers")
另一个想法(我不喜欢)可能是只使用一个事件流主题。那么事件的顺序将永远是正确的。 但是在某些情况下这并不容易实现。 (最佳实践还表明,每个聚合应该使用一个主题)
忽略这个问题,因为它不太可能发生,因为用户会在创建组和添加联系人之间提供必要的延迟到组。 但是事件回放没有延迟,事件主题可以并行/'random'顺序消费。
问题:
这种情况应该很常见。但不幸的是,网络上很少有现实世界 CQRS 示例。而且大多数都不会解释小的/隐藏的陷阱。
你是如何解决这些问题的?
How do you solve those problems?
补救措施是避免尝试从事件历史的不稳定表示中重建图像。当您将状态加载到 write 模型中时,您通常会通过查询一个 "document" 来完成此操作,该 "document" 具有按写入顺序排列的所有聚合历史记录。
在读取模型中采用相同的方法,即读取每个主题的稳定事件历史记录,避免了您可能面临的问题,因为主题事件无序到达。
参见 Greg Young 在 polyglot data 上的演讲。
从多个主题构建阅读模型时,您可以采用相同的方法,这将为您提供每个主题的一致历史……但不一定是同步的整体。
因此,要使用您的具体示例,您可能有 ContactCreated (contactId: "123", name: "Peter")
ContactAddedToGroup (contactId: "123", groupId: "456")
,但没有属于 "middle" 的事件。那么现在呢?
一个可能的答案是使用未对齐的历史构建视图 - 您拥有 00:15 的联系信息和 00:00 的组信息,并且您将时间差异作为读模型。这可能包括使用 NullObject
模式的变体来表示尚不存在的对象。
另一种可能性是使用类似 Lamport Clock 的东西来跟踪不同主题中事件之间的依赖关系。这可能看起来像 ContactAddedToGroup
中的元数据,让消费者知道该事件是 GroupCreated
的结果。然后,消费者可以决定是否忽略缺少先例的事件。
在您的示例中,您可以保证在 ContactAddedToGroup (2) 之前添加了 GroupCreated 事件 (3),因为显然用户无法在创建组之前将联系人添加到组中。因此 GroupCreated 事件将可供读取,即使您碰巧先读取了 ContactAddedToGroup 事件。
坚持使用 2 个独立的流(这绝对是正确的,因为群组和联系人是独立的聚合),这是一种方法:
- 联系人可以维护自己的 table 组名称(您的示例中只需要 id 和 name 列)。或者,如果您乐于将群组和联系人结合起来(它们听起来像是同一个限界上下文),您可以只让单个事件处理程序同时处理群组和联系人投影。
- 投影处理程序订阅组和联系人事件(来自单个进程和线程)。
- 如果它读取了群组名称中没有的群组的联系人添加消息 table,它会立即对群组事件执行追赶(或至少追赶得足够远)它确实得到了那个组)然后再次处理联系事件。
此方法在回放和实时处理期间都有效。在回放期间,您还可以选择在开始使用投影的主流(在本例中为 Contact)之前完全使用父流(例如 Group),但您仍然需要准备好在必要时再次赶上 Group 流,因为在追赶期间可能会出现新事件。
如果您有 GroupRenamed 事件,单线程还可以确保没有竞争条件 - 您可以确定您将重命名所有联系人中的列,而对于多线程,您可能会在插入联系人与旧联系人之间发生竞争组名称和更新使用该组的联系人中的所有组名称的查询。如果你需要疯狂的规模,你将不得不分片你的联系人,并让每个分片维护自己的组名 table,以避免竞争条件。
另一种方法是决定组名允许为空,并且只在您阅读事件时更新联系人(您的第一个想法)。因此,您将以几乎相同的方式处理新组和组重命名(如果允许),但您的客户将需要处理联系人中临时为空的组名,这可能是一个不受欢迎的并发症。