处理多个事件的状态转换

Handling transition to state for multiple events

我有一个 MassTransitStateMachine,它编排了一个涉及创建多个事件的过程。

完成所有事件后,我希望状态过渡到 'clean up' 阶段。

这里是相关的状态声明和过滤函数:

        During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark this source as done. 
                .Then(MarkImportCompletedForLocation),

            When(DataImported, IsAllDataImported)
                // Once all are done, we can transition to cleaning up...
                .Then(CleanUpSources)
                .TransitionTo(CleaningUp)
        );


    ...snip...


    private static bool IsAllDataImported(EventContext<DataImportSagaState, DataImportMappingCompletedEvent> ctx)
    {
        return ctx.Instance.Locations.Values.All(x => x);
    }

因此,当状态为 ImportingData 时,我希望收到多个 DataImported 事件。每个事件都将其位置标记为已完成,以便 IsAllDataImported 方法可以确定我们是否应该转换到下一个状态。

然而,如果最后两个 DataImported 事件同时到达,过渡到 CleaningUp 阶段的处理程序会触发两次,最后我尝试执行两次清理。

可以用我自己的代码解决这个问题,但我希望状态机能够管理这个问题。我是做错了什么,还是只需要自己处理争用?

您可以使用复合事件将多个事件累积到一个后续事件中,该后续事件在触发相关事件时触发。这是使用以下定义的:

CompositeEvent(() => AllDataImported, x => x.ImportStatus, DataImported, MoreDataImported);

During(ImportingData,
    When(DataImported)
        .Then(context => { do something with data }),
    When(MoreDataImported)
        .Then(context => { do smoething with more data}),
    When(AllDataImported)
        .Then(context => { okay, have all data now}));

然后,在你的状态机状态实例中:

class DataImportSagaState :
    SagaStateMachineInstance
{
    public int ImportStatus { get; set; }
}

这应该可以解决您要解决的问题,所以请试一试。请注意,事件顺序无关紧要,它们可以以任何顺序到达,因为已接收事件的状态在实例的 ImportStatus 属性 中。

单个事件的数据未保存,因此您需要使用 .Then() 方法自行将其捕获到状态实例中。

Chris 提出的解决方案不适用于我的情况,因为我有多个相同类型的事件到达。只有当所有这些事件都发生时,我才需要过渡。 CompositeEvent 构造不适用于此用例。

我的解决方案是在 MarkImportCompletedForLocation 方法期间引发新的 AllDataImported 事件。此方法现在处理确定所有子导入是否以线程安全的方式完成。

所以我的状态机定义是:

            During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark the URI in the locations list as done. 
                .Then(MarkImportCompletedForLocation),

            When(AllDataImported)
                // Once all are done, we can transition to cleaning up...
                .TransitionTo(CleaningUp)
                .Then(CleanUpSources)
        );

不再需要 IsAllDataImported 方法作为过滤器。

传奇状态有一个位置 属性:

public Dictionary<Uri, bool> Locations { get; set; }

而MarkImportCompletedForLocation方法定义如下:

    private void MarkImportCompletedForLocation(BehaviorContext<DataImportSagaState, DataImportedEvent> ctx)
    {
        lock (ctx.Instance.Locations)
        {
            ctx.Instance.Locations[ctx.Data.ImportSource] = true;
            if (ctx.Instance.Locations.Values.All(x => x))
            {
                var allDataImported = new AllDataImportedEvent {CorrelationId = ctx.Instance.CorrelationId};
                this.CreateEventLift(AllDataImported).Raise(ctx.Instance, allDataImported);
            }
        }
    }

(我写这个是为了了解一般流程是如何工作的;我认识到 MarkImportCompletedForLocation 方法需要通过验证字典中是否存在键来加强防御。)