Lagom:命令处理程序中的异步操作

Lagom: Asynchronous Operations in Command Handlers

在Lagom中,当命令处理程序必须执行一些异步操作时,你会怎么做?例如:

override def behavior = Actions().onCommand[MyCommand, Done] {
  case (cmd, ctx, state) =>
    // some complex code that performs asynchronous operations
    // (for example, querying other persistent entities or the read-side
    // by making calls that return Future[...] and composing those),
    // as summarized in a placeholder below:
    val events: Future[Seq[Event]] = ???
    events map {
      xs => ctx.thenPersistAll(xs: _*) { () => ctx.reply(Done) }
    }
}

这样的代码的问题在于,编译器希望命令处理程序为 return Persist,而不是 Future[Persist]

这是故意的,以确保事件以正确的顺序持久化(也就是说,先前命令生成的事件必须在后面的命令生成的事件之前保存)?但这不能通过适当管理事件偏移量来处理,以便日志始终正确排序它们,而不管它们实际保存的时间?

在这种情况下,当命令处理复杂到需要从命令处理程序进行异步调用时,人们会怎么做?

邮件列表中有一个类似的问题,James 已经给出了答案。 https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!topic/lagom-framework/Z6lynjNTqgE

简而言之,您在 CQRS 应用程序中的实体是一个一致性边界,应该只依赖于它在内部立即可用的数据,而不是外部(不调用外部服务)。

您可能正在寻找所谓的 Command Enrichment。您收到一个请求,从外部服务收集一些数据并构建一个命令,其中包含您需要发送给您的实体的所有内容。

您当然不应该通过查询读取端来在您的写入端实体内部做出业务决策。您也不应根据来自其他实体的数据做出业务决策。

您的实体应该能够做出所有决定,因为它是您模型的一致性边界。

在这些情况下,我一直在做的是将 PersistentEntityRef 传递给异步操作,以便它可以向实体及其那些命令处理程序(不是产生异步计算的那个)发出命令然后持久化事件。

请记住,其中的 none 是原子的,因此您必须考虑如果异步操作在发出命令的中途失败或者某些命令成功而某些命令失败等情况下会发生什么。大概您需要一些针对系统故障的重试机制。如果您将命令处理程序构建为幂等的,它将帮助您处理重复项。