处理多个事件的状态转换
Handling transition to state for multiple events
我有一个 MassTransitStateMachine,它编排了一个涉及创建多个事件的过程。
完成所有事件后,我希望状态过渡到 'clean up' 阶段。
这里是相关的状态声明和过滤函数:
During(ImportingData,
When(DataImported)
// When we get a data imported event, mark this source as done.
.Then(MarkImportCompletedForLocation),
When(DataImported, IsAllDataImported)
// Once all are done, we can transition to cleaning up...
.Then(CleanUpSources)
.TransitionTo(CleaningUp)
);
...snip...
private static bool IsAllDataImported(EventContext<DataImportSagaState, DataImportMappingCompletedEvent> ctx)
{
return ctx.Instance.Locations.Values.All(x => x);
}
因此,当状态为 ImportingData 时,我希望收到多个 DataImported 事件。每个事件都将其位置标记为已完成,以便 IsAllDataImported 方法可以确定我们是否应该转换到下一个状态。
然而,如果最后两个 DataImported 事件同时到达,过渡到 CleaningUp 阶段的处理程序会触发两次,最后我尝试执行两次清理。
我可以用我自己的代码解决这个问题,但我希望状态机能够管理这个问题。我是做错了什么,还是只需要自己处理争用?
您可以使用复合事件将多个事件累积到一个后续事件中,该后续事件在触发相关事件时触发。这是使用以下定义的:
CompositeEvent(() => AllDataImported, x => x.ImportStatus, DataImported, MoreDataImported);
During(ImportingData,
When(DataImported)
.Then(context => { do something with data }),
When(MoreDataImported)
.Then(context => { do smoething with more data}),
When(AllDataImported)
.Then(context => { okay, have all data now}));
然后,在你的状态机状态实例中:
class DataImportSagaState :
SagaStateMachineInstance
{
public int ImportStatus { get; set; }
}
这应该可以解决您要解决的问题,所以请试一试。请注意,事件顺序无关紧要,它们可以以任何顺序到达,因为已接收事件的状态在实例的 ImportStatus 属性 中。
单个事件的数据未保存,因此您需要使用 .Then()
方法自行将其捕获到状态实例中。
Chris 提出的解决方案不适用于我的情况,因为我有多个相同类型的事件到达。只有当所有这些事件都发生时,我才需要过渡。 CompositeEvent 构造不适用于此用例。
我的解决方案是在 MarkImportCompletedForLocation 方法期间引发新的 AllDataImported 事件。此方法现在处理确定所有子导入是否以线程安全的方式完成。
所以我的状态机定义是:
During(ImportingData,
When(DataImported)
// When we get a data imported event, mark the URI in the locations list as done.
.Then(MarkImportCompletedForLocation),
When(AllDataImported)
// Once all are done, we can transition to cleaning up...
.TransitionTo(CleaningUp)
.Then(CleanUpSources)
);
不再需要 IsAllDataImported 方法作为过滤器。
传奇状态有一个位置 属性:
public Dictionary<Uri, bool> Locations { get; set; }
而MarkImportCompletedForLocation方法定义如下:
private void MarkImportCompletedForLocation(BehaviorContext<DataImportSagaState, DataImportedEvent> ctx)
{
lock (ctx.Instance.Locations)
{
ctx.Instance.Locations[ctx.Data.ImportSource] = true;
if (ctx.Instance.Locations.Values.All(x => x))
{
var allDataImported = new AllDataImportedEvent {CorrelationId = ctx.Instance.CorrelationId};
this.CreateEventLift(AllDataImported).Raise(ctx.Instance, allDataImported);
}
}
}
(我写这个是为了了解一般流程是如何工作的;我认识到 MarkImportCompletedForLocation 方法需要通过验证字典中是否存在键来加强防御。)
我有一个 MassTransitStateMachine,它编排了一个涉及创建多个事件的过程。
完成所有事件后,我希望状态过渡到 'clean up' 阶段。
这里是相关的状态声明和过滤函数:
During(ImportingData,
When(DataImported)
// When we get a data imported event, mark this source as done.
.Then(MarkImportCompletedForLocation),
When(DataImported, IsAllDataImported)
// Once all are done, we can transition to cleaning up...
.Then(CleanUpSources)
.TransitionTo(CleaningUp)
);
...snip...
private static bool IsAllDataImported(EventContext<DataImportSagaState, DataImportMappingCompletedEvent> ctx)
{
return ctx.Instance.Locations.Values.All(x => x);
}
因此,当状态为 ImportingData 时,我希望收到多个 DataImported 事件。每个事件都将其位置标记为已完成,以便 IsAllDataImported 方法可以确定我们是否应该转换到下一个状态。
然而,如果最后两个 DataImported 事件同时到达,过渡到 CleaningUp 阶段的处理程序会触发两次,最后我尝试执行两次清理。
我可以用我自己的代码解决这个问题,但我希望状态机能够管理这个问题。我是做错了什么,还是只需要自己处理争用?
您可以使用复合事件将多个事件累积到一个后续事件中,该后续事件在触发相关事件时触发。这是使用以下定义的:
CompositeEvent(() => AllDataImported, x => x.ImportStatus, DataImported, MoreDataImported);
During(ImportingData,
When(DataImported)
.Then(context => { do something with data }),
When(MoreDataImported)
.Then(context => { do smoething with more data}),
When(AllDataImported)
.Then(context => { okay, have all data now}));
然后,在你的状态机状态实例中:
class DataImportSagaState :
SagaStateMachineInstance
{
public int ImportStatus { get; set; }
}
这应该可以解决您要解决的问题,所以请试一试。请注意,事件顺序无关紧要,它们可以以任何顺序到达,因为已接收事件的状态在实例的 ImportStatus 属性 中。
单个事件的数据未保存,因此您需要使用 .Then()
方法自行将其捕获到状态实例中。
Chris 提出的解决方案不适用于我的情况,因为我有多个相同类型的事件到达。只有当所有这些事件都发生时,我才需要过渡。 CompositeEvent 构造不适用于此用例。
我的解决方案是在 MarkImportCompletedForLocation 方法期间引发新的 AllDataImported 事件。此方法现在处理确定所有子导入是否以线程安全的方式完成。
所以我的状态机定义是:
During(ImportingData,
When(DataImported)
// When we get a data imported event, mark the URI in the locations list as done.
.Then(MarkImportCompletedForLocation),
When(AllDataImported)
// Once all are done, we can transition to cleaning up...
.TransitionTo(CleaningUp)
.Then(CleanUpSources)
);
不再需要 IsAllDataImported 方法作为过滤器。
传奇状态有一个位置 属性:
public Dictionary<Uri, bool> Locations { get; set; }
而MarkImportCompletedForLocation方法定义如下:
private void MarkImportCompletedForLocation(BehaviorContext<DataImportSagaState, DataImportedEvent> ctx)
{
lock (ctx.Instance.Locations)
{
ctx.Instance.Locations[ctx.Data.ImportSource] = true;
if (ctx.Instance.Locations.Values.All(x => x))
{
var allDataImported = new AllDataImportedEvent {CorrelationId = ctx.Instance.CorrelationId};
this.CreateEventLift(AllDataImported).Raise(ctx.Instance, allDataImported);
}
}
}
(我写这个是为了了解一般流程是如何工作的;我认识到 MarkImportCompletedForLocation 方法需要通过验证字典中是否存在键来加强防御。)