事件溯源/CQRS 读取模型 - 预测

Event sourcing / CQRS read model - projections

我在 AWS Lambda 上有一个基于微服务的应用程序 运行。其中最关键的两个微服务使用 event-sourcing/cqrs。

背景:(这也是我整理一下思路)

我正在使用 this library 并在 DynamoDB 中存储事件,在 AWS S3 中存储预测。

写入部分就像一个魅力:每个命令调用从 DynamoDB 加载聚合的当前状态(通过 运行 事件通过处理程序 and/or 加载缓存的聚合),它决定根据某些业务逻辑接受或拒绝命令,然后使用 KeyConditionExpression: 'aggregateId = :a AND version >= :v' 写入 DynamoDB,其中版本是为该聚合处理的事件计数。如果存在冲突,则写入失败。对我来说是个不错的系统!

然后每个事件都会广播到 SNS(主题名称是服务名称),以便其他服务可以根据需要对事件做出反应。

我真正纠结的部分是阅读。投影存储在 S3 中,并标记有为每个事件源处理的最后一个 commitId。当读取查询进入时,它从 S3 (对于所有聚合) 加载整个投影状态,查询所有较新事件的事件源,计算最新状态(同样,对于所有聚合- 如果更新的对象更新,则将其写入 S3),以及 returns 基于查询参数的状态的相关部分。

我的问题:(或其中之一)

我想我的预测有误。

我的大部分投影仅按重要属性对 id 进行分组,因此文件相对较小。但我还需要一种方法来检索单个聚合。为此使用投影似乎很疯狂,因为我每次都需要加载整个状态(即每个投影聚合)应用新事件,然后检索我想要的记录(它甚至可能没有改变)。

这就是我现在正在做的,它表现良好(<10 万条记录)但我无法想象它会持续更长时间。

另一个问题是查询。我需要为我需要查询的每个属性构建一个投影映射值来匹配 aggregateIds!!必须有更好的方法!

无论我如何看待这个问题,投影总是需要整个当前状态 + 任何新事件才能 return 即使是一条没有改变的记录。

I think I'm doing projections wrong.

我也这么认为;听起来您的查询与您的预测相结合

When a read query comes in, it loads the entire projected state from S3 (for all aggregates), queries the event sources for all newer events, computes the latest state

是的,听起来一团糟。或者更具体地说,这听起来像是查询正在触发投影完成的工作。

如果您可以将查询与预测分离,那么事情就会变得更容易。基本思想是您的查询不描述 当前 状态,它们描述 上次投影时的状态 运行.

相同的想法,不同的拼写:您从缓存在 S3 中的文档中回答查询。当检测到新事件时,您的预测 运行、根据需要加载新数据、计算新文档并替换缓存中的条目。

我想到了三角形

  • 命令将外部信息带入记录簿
  • 投影将记录簿中的信息带入缓存
  • 查询将缓存中的信息带到外界

其中三角形的每条边 运行 与其他边不同步。

我建议您从查询开始倒退 - 您需要哪些文档来支持每个查询?您必须克服的延迟目标是什么?然后开始权衡取舍 - 对于这个新查询,我是从现有文档创建结果,还是需要一个更细粒度的新文档?

if I understand correctly, I should be triggering the projection updates as events come in, instead of in aggregate when the query is made. That saves me from querying the event store for new events on every query

是的,而且...事件只是一种触发方式;您还可以让投影过程由时钟触发(每 15 分钟检查一次,看看我们是否需要更新)或由人工操作员突发奇想(嗯,看起来您的帐户余额已过时,让我尝试更新一下为你)。不止一种方法,您可以混合搭配策略。

I would still need to load the entire state, both when updating the projection, and when loading a single aggregate.

不一定。没有规则说您不能使用以前缓存的表示作为起点,然后仅从记录簿中提取您需要的更改。

例如,假设您正在构建一个结合聚合 A{id:7}B{id:9} 的视图。你抓取缓存的副本,查看它的元数据(你把它放在你以前写入的地方)并在其中找到像 metadata:{A:{id:7, version:21}, B:{id:9, version:19}} 这样的东西。现在你只需要加载上次使用的事件之后的事件,更新内存中的本地副本,更新元数据的本地副本,并将批次推送到缓存。

我不熟悉你们的技术基础设施,但我实现预测的方式如下:

每个域事件都有一个跨越所有聚合根的全局序列号。投影是一种读取模型,具有任意名称和由该全局序列号表示的最后处理位置。我可以随时添加一个新的投影及其事件处理程序,它将从位置 0 开始。我可以随时清除投影并将位置设置回 0。我还可以结合使用添加一个新的将替换现有投影的投影,即使需要几天时间也要进行构建,然后删除旧投影。

有一个服务可以监视投影并几乎像队列一样使用事件存储。投影服务会在当前位置 之后检查具有全局 ID 的事件,并将这些事件交给处理程序,然后更新位置。这是您的投影甚至可以过滤事件类型以提高性能的地方。

这是基本思路。您的预测就是您查询的内容。一旦投影赶上事件存储的 "head",来自事件存储的事件将被滴入投影中。

这将如何转化为您的技术 space 我不太确定。我有一个名为 Shuttle.Recall 的 C# 实验,如果你想看看以获得一些想法。