拉各姆 | Return 来自读端处理器的值

Lagom | Return Values from read side processor

我们正在使用 Lagom 来开发我们的微服务集。这里的技巧是,虽然我们使用事件源并将事件持久化到 cassandra 中,但我们也必须将数据存储在图形数据库之一中,因为由于用例,它将成为大多数查询的服务对象.

根据 Lagom 的文档,在命令处理程序将事件持久化到 cassandra 之后,必须在 ReadSideProcecssor 中完成对图形数据库(或任何其他数据库)的所有插入,遵循 CQRS 的理念。

下面是我们面临的问题。我们认为 ReadSideProcecssor 是一个监听器,它在事件生成并持久化后被触发。我们想要的是我们可以 return 从 ReadSideProcecssor 返回到 ServiceImpl 的响应。例如,当用户被添加到系统时,图表生成的唯一 ID 必须 returned 作为响应之一 headers。由于响应是从 setCommandHandler 而不是 ReadSideProcessor.

构建的,因此如何在 Lagom 中实现这一点

此外,我们需要确保如果由于图形端的任何错误,API 应该通知客户端请求失败,但不会传播 ReadSideProcessor 中发生的异常PersistentEntityServiceImpl class。如何实现?

非常感谢任何帮助。

读取端处理器不是附加到命令的侦听器 - 它实际上与持久实体完全断开连接,它可能 运行 在不同的节点上,在不同的时间,甚至可能几年后,如果你添加一个新的读取端处理器,它首先跟上历史上所有旧事件的速度。如果读取端处理器同步连接到命令,那么它就不会是 CQRS,命令端和查询端之间不会有隔离。

读取侧处理器实质上是轮询数据库以查找新事件,并在检测到新事件时对其进行处理。您可以随时添加一个新的读取端处理器,它会从所有历史记录中获取所有事件,而不仅仅是添加的新事件,这是事件溯源的一大优点,您无需预期您从一开始就需要所有查询,您可以在查询需要时添加它们。

进一步解释为什么您不想在两者之间建立连接 - 如果事件持续成功但图形数据库更新失败会怎样?也许图形数据库崩溃了。该命令是否必须重试?是否必须删除该事件?如果执行更新的节点在有机会修复问题之前崩溃了,会发生什么情况?现在您的读取端与您的实体处于不一致状态。将它们连接起来会导致许多故障情况下的不一致——例如,当您向公用事业公司更新地址时,但您的帐单仍会转到旧地址,您联系他们,他们说 "yes, your new address is updated in our system",但是他们仍然会转到旧地址 - 如果您尝试将读取端和写入端连接在一起,那是您为用户注册的那种糟糕的用户体验。断开连接允许 Lagom 确保您在写入端发出的事件与读取端消耗它们之间的一致性。

所以为了解决您的具体问题:ID 生成应该在写入端完成,或者,如果后续 ID 在读取端生成,它还应该提供一种将写入端的 ID 映射到读取端 ID。至于在读取端处理错误 - 所有验证都应在写入端完成 - 写入端应确保它永远不会发出无效事件。

现在如果读端处理器遇到无效的东西,那么它有两个选择。一种选择是它可能会失败。在许多情况下,这是一个不错的选择,因为如果某些内容无效或不一致,则很可能是您遇到了错误或某种形式的损坏。您不想做的是继续处理,就好像一切都很开心,因为这可能会使数据损坏或不一致变得更糟。相反,读取端处理器会停止,然后您的监控应该会检测到错误,您可以进去找出错误是什么或修复损坏。当然,这样做也有缺点,您的读取端将开始落后于写入端,同时无法处理新事件。但这也是 CQRS 的一个优势——写入端能够继续工作,继续执行一致性等,故障只是隔离到读取端,并且只更新读取端。你的整个系统不会因为这个错误而崩溃并拒绝接受新请求,而是将它隔离到问题所在。

读取端的另一个选项是它可以将错误存储在某个地方——例如,将事件存储在死信中table,或者提出某种故障单,然后继续处理。这样,您就可以在事后修复事件。这确保了更高的可用性,但确实存在这样的风险,即如果它未能处理的那个事件对后续事件的处理很重要,那么您可能只会让自己陷入更大的混乱之中。

现在这确实对您能做什么和不能做什么引入了具体的限制,但我无法真正预料到那些不了解您的用例的具体知识的人知道如何解决这些问题。一个常见的约束是设置验证——例如,您如何确保电子邮件地址对于您系统中的单个用户是唯一的? Greg Young(CQRS 专家)写了这篇博客 post 关于这些类型的问题:

http://codebetter.com/gregyoung/2010/08/12/eventual-consistency-and-set-validation/