DDD - 并发和命令重试有副作用

DDD - Concurrency and Command retrying with side-effects

我正在开发一个事件源电动汽车充电站管理系统,它连接到多个充电站。在这个域中,我提出了充电站的聚合,其中包括充电站的内部状态(是否联网,如果汽车正在使用充电站的连接器充电)。

站点通过标准化协议中定义的消息通知我其状态:

并且我的服务器可以向这个站发送命令:

我已经为这个充电站开发了一个聚合体。它包含其连接器的内部实体,无论是否充电,如果电源系统出现问题,...

Aggregate 的内存表示驻留在我控制的服务器中,而不是充电站本身,它有一个 StationClient 服务,负责将这些命令发送到物理充电站(伪代码):

class StationAggregate {
  stationClient: StationClient
  URL: string
  connector: Connector[]

  unlock(connectorId) {
    if this.connectors.find(connectorId).isAvailableToBeUnlocked() {
      return ErrorConnectorNotAvailable
    }
    error = this.stationClient.sendRemoteStartTransaction(this.URL, connectorId)
    if error {
      return ErrorStationRejectedUnlock
    }
    this.applyEvents([
      StationUnlockedEvent(connectorId, now())
    ])
    return Ok
  }

  receiveHeartbeat(timestamp) {
    this.applyEvents([
      StationSentHeartbeat(timestamp)
    ])
    return Ok
  }
}

我正在使用乐观并发,这意味着,我从事件列表中加载聚合,并将聚合的当前版本存储在其内存表示中:版本 #2032 中的 StationAggregate,当命令是成功处理并应用事件,例如,它将在版本 #2033 中。这样,我就可以在持久层上对 (StationID, Version) 元组进行唯一约束,并保证只持久化一个事件。

如果有任何机会,会收到 Heartbeat 消息,并收到 Unlock 命令。在两个线程中,它们将加载 StationAggregate 并且都在版本 X 中,在 Heartbeat 接收的情况下,不会有副作用,但在 [= =29=]Unlock命令,会有一个副作用告诉物理充电站被解锁。然而,当我使用乐观并发时,StationUnlocked 事件可能会被持久层拒绝。我不知道我该如何处理,因为我无法重试命令,因为它 inherently not idempotent(as the physical站将拒绝第二个请求)

我不知道我是否建模有误,或者它是否真的是一个很难建模的领域。

我不确定我是否完全理解这个问题,但乐观并发的想法是防止在竞争条件下写入。版本用于确保您的写入操作的版本是您在执行命令之前从数据库中获得的版本的 +1。

因此,如果有一个并行写入成功并且您从事件存储中返回了错误的版本异常,您可以完全重试命令执行,这意味着您再次读取流并通过这样做获得最新状态与新版本。然后,您将命令发送给聚合,它决定执行该操作是否有意义。

这个问题与事件溯源没有特别的关系,它与任何持久性都相关,并且以相同的方式解决。

事件溯源可以为您带来额外的好处,因为您知道 发生了什么。想象一下,你偶然得到了两次 Unlock 命令。当您从商店取回 "wrong version" 时,您可以读取最后一个事件并确定该命令是否已经执行。它可以在逻辑上完成(如果它已经被同一客户解锁,则无需解锁),在技术上(将命令 id 放入事件元数据并进行比较),或两种方式都可以。

在处理重复的命令时,确保命令处理的幂等性水平是有意义的,忽略重复并且 return OK 而不是在用户面前失败。

我可以从关于域的非常有限的信息量中推断出的另一个观察结果是,心跳是遥测,锁定和解锁是业务。我认为将这两个截然不同的东西组合到一个域对象中没有多大意义。

更新,关注评论中的讨论:

您在生成事件的同时向站发送命令得到的是两阶段提交的变体。由于它不在事务中执行,因此这两个操作中的任何一个都可能失败并导致系统进入不一致状态。如果命令发送失败,您要么不知道站点是否收到了解锁自身的命令,要么如果事件持久化失败,您不知道它已解锁。你只得到了第二次操作,但第一种情况也可能发生。

有很多方法可以解决。

首先,完全从技术上解决。使用 MassTransit,使用 Outbox 很容易修复。在原始消息的消费者完全完成其工作之前,它不会发送任何传出消息。因此,如果 Unlock 命令的消费者未能持久化事件,则不会发送该命令。然后,重试过滤器将启动,整个操作将再次执行,您已经脱离竞争条件,因此操作将正确完成。

但是当你发送到物理站的命令发送失败时,它并不能解决问题(我认为这是一种边缘情况)。

这个问题也很容易解决,这里事件溯源很有帮助。您需要将发送命令的方式从原来的(用户驱动的)命令使用者转换为订阅者。你订阅了StationUnlocked事件的事件流,让订阅者向站台发送命令。这样一来,您只会在事件持续存在的情况下向站点发送命令,并且您可以根据需要多次重试发送命令。

最后,你可以用更有意义的方式解决它,改变语义。我已经提到心跳是遥测消息。我希望该站也能响应锁定和解锁命令,告诉你它是否真的按照你的要求做了。

您可以使用站点遥测来创建物理站点的表示,它不是聚合的一部分。实际上,它更像是物理世界的ACL,表示为读取模型。

当你身边有这样一个物理站的镜像时,当你在你的域中执行Unlock命令时,你可以聘请一个域服务器来咨询当前的站点状态并做出决定.如果您发现该站已经解锁并且会话 ID 匹配(是的,我记得我们之前的讨论:))-您 return OK 并安全地忽略该命令。如果它被锁定 - 你继续。如果它已解锁并且会话 ID 不匹配 - 这显然是一个错误,您需要执行其他操作。

在最后一个选项中,您可以明确地将遥测处理与业务分开,这样心跳就不会影响您的域模型,因此您真的不会遇到版本控制问题。您也将始终有一个地方可以查看以了解物理站的当前状态。