在 CQRS 读取端处理乱序事件

Handling out of order events in CQRS read side

我读过乔纳森·奥利弗 (Jonathan Oliver) 写的关于处理乱序事件的精彩 post。

http://blog.jonathanoliver.com/cqrs-out-of-sequence-messages-and-read-models/

The solution that we use is to dequeue a message and to place it in a “holding table” until all messages with a previous sequence are received. When all previous messages have been received we take all messages out of the holding table and run them in sequence through the appropriate handlers. Once all handlers have been executed successfully, we remove the messages from the holding table and commit the updates to the read models.

This works for us because the domain publishes events and marks them with the appropriate sequence number. Without this, the solution below would be much more difficult—if not impossible.

This solution is using a relational database as a persistence storage mechanism, but we’re not using any of the relational aspects of the storage engine. At the same time, there’s a caveat in all of this. If message 2, 3, and 4 arrive but message 1 never does, we don’t apply any of them. The scenario should only happen if there’s an error processing message 1 or if message 1 somehow gets lost. Fortunately, it’s easy enough to correct any errors in our message handlers and re-run the messages. Or, in the case of a lost message, to re-build the read models from the event store directly.

有几个问题,特别是关于他如何说我们总是可以向事件存储询问丢失事件的问题。

  1. CQRS 的写入端是否必须公开读取服务 "demand" 重播事件?例如,如果事件 1 不是 received but but but 2, 4, 3 have 我们可以通过a询问eventstore吗 从 1 开始重新发布事件的服务?
  2. 这个服务是CQRS写端的责任吗?
  3. 我们如何使用它重新构建读取模型?

如果您有序列号,那么您可以检测到当前事件发生故障的情况,例如currentEventNumber != lastReceivedEventNumber + 1

一旦你检测到,你就抛出一个异常。如果您的订户具有 'retries' 的机制,它将在一秒钟左右尝试再次处理此事件。很有可能在此期间处理较早的事件并且顺序是正确的。如果乱序事件很少发生,这是一个解决方案。

如果您经常遇到这种情况,您需要实施全局锁定机制,这将允许顺序处理某些事件。 例如,我们在 MSSQL 中使用 sp_getapplock 来实现某些情况下的全局 "critical section" 行为。当分布式应用程序的多个部分需要的不仅仅是简单的锁时,Apache ZooKeeper 提供了一个框架来处理更复杂的场景。

另一种选择是将您从 (S1) 读取事件的服务提供给服务,使其只能为您的服务 (S2) 产生 in-order 事件。

例如,如果您有许多不同会话的事件负载,则在前端有一个排序服务 (O1) 负责排序。它确保每个会话只有一个事件传递给 (S1),并且只有当 (S1) 和 (S2) 都成功处理时,(O1) 才允许该会话的新事件传递给 (S1)。为了性能也需要排队。

基于时间戳的解决方案:

收到的消息是:

{
 id: 1,
 timestamp: T2,
 name: Samuel
}
{
 id: 1,
 timestamp: T1,
 name: Sam,
 age: 26
}
{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 contact: 123
}

无论数据库中的 ORDER 是什么,我们希望看到的 是:

{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 age: 26,
 contact: 123
}

对于每条收到的消息,请执行以下操作:

  1. 获取持久记录并评估时间戳。
  2. 以时间戳大者为目标。

现在让我们浏览一下消息:

  1. T2先到:将它存储在数据库中,因为它是第一个。
  2. T1 接下来到达:持续的 (T2) & 传入的 (T1),所以 T2 是目标。
  3. T3 到达:持续一个 (T2) 和传入 (T1),因此 T3 是目标。

下面的 deepMerge(src, target) 应该可以给我们结果:

public static JsonObject deepMerge(JsonObject source, JsonObject target) {
    for (String key: source.keySet()) {
        JsonElement srcValue = source.get(key);
        if (!target.has(key)) { // add only when target doesn't have it already
            target.add(key, srcValue);
        } else {
            // handle recursively according to the requirement

        }
    }
    return target;
}

如果您需要完整版的 deepMerge()

,请在评论中告诉我