EventSourced Saga 实现

EventSourced Saga Implementation

我已经编写了一个事件源聚合,现在实现了一个事件源传奇...我注意到这两者很相似,并创建了一个事件源对象作为基础class,两者都从中派生。

我在这里看过一个演示 http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/ 但感觉可能存在问题,因为命令的发送是在写事务之外进行的,因此在进程崩溃的情况下命令可能会丢失?

public void Save(ISaga saga)
{
    var events = saga.GetUncommittedEvents();
    eventStore.Write(new UncommittedEventStream
    {
        Id = saga.Id,
        Type = saga.GetType(),
        Events = events,
        ExpectedVersion = saga.Version - events.Count
    });

    foreach (var message in saga.GetUndispatchedMessages())
        bus.Send(message); // can be done in different ways

    saga.ClearUncommittedEvents();
    saga.ClearUndispatchedMessages();
}

相反,我使用的是 Greg Young 的 EventStore,当我保存 EventSourcedObject(聚合或传奇)时,序列如下:

  1. 存储库获取新的 MutatingEvents 列表。
  2. 将它们写入流。
  3. 当流被写入并提交到流时,EventStore 会触发新事件。
  4. 我们从 EventStore 中监听事件并在 EventHandlers 中处理它们。

我正在实现传奇的两个方面:

  1. 接受事件,这可能转换状态,这反过来可能发出命令.
  2. 要有一个警报,以便在将来的某个时候(通过外部计时器服务)我们可以回电。

问题

  1. 据我所知,事件处理程序不应发出命令(如果命令失败会发生什么情况?)-但我可以接受上述内容吗,因为 Saga 是通过此 event 代理控制命令创建(响应事件)的实际事物,并且命令发送的任何失败都可以在外部处理(在处理 [=14 的外部 EventHandler 中) =] 并在命令失败时重新发送)?

  2. 或者我忘记包装事件并将本机 CommandsEvents 存储在同一个流中(与基础 class 消息混合 - Saga 会消耗命令和事件,聚合只会消耗事件)?

  3. 网上还有其他关于事件源 Sagas 实施的参考资料 material 吗?有什么我可以理智地检查我的想法的吗?

部分后台代码如下

Saga 向 运行 发出命令(包装在 CommandEmittedFromSaga 事件中)

下面的命令包含在事件中:

public class CommandEmittedFromSaga : Event
{
    public readonly Command Command;
    public readonly Identity SagaIdentity;
    public readonly Type SagaType;

    public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
    {
        Command = command;
        SagaType = sagaType;
        SagaIdentity = sagaIdentity;
    }
}

Saga 请求在未来某个时间回调(AlarmRequestedBySaga 事件)

警报回调请求被包裹在一个事件中,并将在请求的时间或之后触发并事件到 Saga:

public class AlarmRequestedBySaga : Event
{
    public readonly Event Event;
    public readonly DateTime FireOn;
    public readonly Identity Identity;
    public readonly Type SagaType;

    public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
    {
        Identity = identity;
        SagaType = sagaType;
        Event = @event;
        FireOn = fireOn;
    }
}

或者我可以将命令和事件存储在相同的基本类型消息流中

public abstract class EventSourcedSaga
{
    protected EventSourcedSaga() { }

    protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
    {
        Identity = id;

        if (messages == null) throw new ArgumentNullException(nameof(messages));

        var count = 0;

        foreach (var message in messages)
        {
            var ev = message as Event;
            var command = message as Command;

            if(ev != null) Transition(ev);
            else if(command != null) _messages.Add(command);
            else throw new Exception($"Unsupported message type {message.GetType()}");

            count++;
        }

        if (count == 0)
            throw new ArgumentException("No messages provided");

        // All we need to know is the original number of events this
        // entity has had applied at time of construction.
        _unmutatedVersion = count;
        _constructing = false;
    }

    readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
    readonly List<Message> _messages = new List<Message>();
    readonly int _unmutatedVersion;
    private readonly bool _constructing = true;
    public readonly Identity Identity;

    public IList<Message> GetMessages()
    {
        return _messages.ToArray();
    }

    public void Transition(Event e)
    {
        _messages.Add(e);
        _dispatcher.Dispatch(this, e);
    }

    protected void SendCommand(Command c)
    {
        // Don't add a command whilst we are in the constructor. Message
        // state transition during construction must not generate new
        // commands, as those command will already be in the message list.
        if (_constructing) return;

        _messages.Add(c);
    }

    public int UnmutatedVersion() => _unmutatedVersion;
}

我认为前两个问题是对流程管理器(又名 Sagas,请参阅底部的术语注释)错误理解的结果。

改变你的想法

您似乎正在尝试将其建模(就像我曾经做过的那样)作为逆聚合。问题在于:聚合的 "social contract" 是它的输入(命令)可以随时间变化(因为系统必须能够随时间变化),但它的输出(事件)不能。一旦写入,事件就是历史问题,系统必须始终能够处理它们。有了这个条件,就可以从不可变的事件流中可靠地加载聚合。

如果您尝试将输入和输出反转为流程管理器实现,则它的输出不能记录在案,因为随着时间的推移,命令可能会被弃用并从系统中删除。当您尝试使用已删除的命令加载流时,它会崩溃。因此,无法从不可变消息流中可靠地重新加载建模为反向聚合的流程管理器。 (好吧,我相信你能想出一个办法……但这明智吗?)

因此,让我们通过查看流程管理器替换的内容来考虑实施流程管理器。以管理订单履行等流程的员工为例。您为该用户做的第一件事是在 UI 中设置一个视图供他们查看。您要做的第二件事是在 UI 中创建按钮,以便用户根据他们在视图中看到的内容执行操作。前任。 "This row has PaymentFailed, so I click CancelOrder. This row has PaymentSucceeded and OrderItemOutOfStock, so I click ChangeToBackOrder. This order is Pending and 1 day old, so I click FlagOrderForReview"...等等。一旦决策过程定义明确并且开始需要用户花费太多时间,您的任务就是使该过程自动化。为了自动化,其他一切都可以保持不变(视图,甚至一些UI所以你可以检查它),但是用户已经变成了一段代码。

"Go away or I will replace you with a very small shell script."

进程管理器代码现在会定期读取视图,并可能在存在特定数据条件时发出命令。本质上,流程管理器的最简单版本是一些在计时器(例如每小时)上运行并取决于特定视图的代码。这就是我要开始的地方......使用您已经拥有的东西(views/view 更新程序)和最少的添加(定期运行的代码)。即使您稍后决定对某些用例需要不同的功能,"Future You" 也会更好地了解需要解决的具体缺点。

这是一个让您想起 Gall's law 以及 YAGNI 的好地方。

  1. Any other reference material on the net for implementation of event sourced Sagas? Anything I can sanity check my ideas against?

好的 material 很难找到,因为这些概念具有非常可塑的实现,并且有各种各样的例子,其中许多为了通用目的而过度设计。但是,这里有一些我在答案中使用的参考资料。

DDD - Evolving Business Processes
DDD/CQRS Google Group(大量阅读material)


注意术语 Saga 与流程管理器的含义不同。一个常见的 saga 实现基本上是一个路由单,每个步骤及其相应的故障补偿都包含在单据中。这取决于路由单的每个接收者执行路由单上指定的内容并将其成功传递到下一跳或执行故障补偿并向后路由。在处理由不同组管理的多个系统时,这可能有点过于乐观,因此通常使用进程管理器来代替。 See this SO question 获取更多信息。