MassTransit Consumer Saga 能否通过其编排的相同消息启动?

Can a MassTransit Consumer Saga be InitiatedBy the same message(s) that it Orchestrates?

Event Hub Riders in 7.0 plus the existing InMemoryRepository backing for Sagas looks like it could provide a straightforward means of creating aggregate states based on a stream of correlated messages, e.g. across all sensors in a Building). In this scenario, the Building's Identifier would be used as the CorrelationId of the Messages, the Saga, and as the PartitionKey of the EventData messages sent to the Event Hub, ensuring the same consuming service instance receives all messages for that device at a given time. Given the way Event Hub's rebalancing works 的新支持,可以假设在某个时候当此服务 运行 时,管理分区消息的服务实例将转移到新主机,这将开始读取建筑物中传感器发送的消息。那一刻:

  1. 新主机对旧主机的处理一无所知。它只知道它现在正在接收事件中心分区的消息,其中包括该建筑物的消息。
  2. 发送消息的设备对“它们的下游”状态聚合责任的转换一无所知——它们仍然一如既往地愉快地报告新的测量结果。

这带来的挑战是:在新的服务实例上,我们需要创建一个新的 Saga 来接管之前的 Saga,但唯一知道给定实体没有 Saga 的是 MassTransit:没有在新实例上知道从 A 楼读取的传感器是第一个从 A 楼读取的传感器,因为此服务实例接管了跟踪聚合 A 楼状态。我们认为这可以通过用 InitiatedByOrchestrates 标记相同的消息 (DataCollected) 来处理:

public class BuildingAggregator:
            ISaga,
            InitiatedBy<DataCollected>, //init saga on first DataCollected with a given CorrelationId seen
            Orchestrates<DataCollected> //then keep handling those in that saga
{
     //saga Consume methods
}

但是,当 BuildingAggregator 收到第二条带有给定 Guid 的 DataCollected 消息时,会引发以下异常:

Saga exception on receipt of MassTransitFW_POC.Program+DataCollected: The message cannot be accepted by an existing saga
   at MassTransit.Saga.Policies.NewSagaPolicy`2.MassTransit.Saga.ISagaPolicy<TSaga,TMessage>.Existing(SagaConsumeContext`2 context, IPipe`1 next)
   at MassTransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at MassTransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.<Send>d__4`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---

有没有其他方法可以实现这个逻辑?这是应用 Sagas 的“错误方式”吗?

根据 Chris Patterson's 对上述问题的评论,这可以通过状态机语法实现:

Initially(
    When(DataCollected)
        .Then(f => _logger.LogInformation("Initiating Network Manager for Network: {NetworkId}", f.Data.NetworkId))
        .TransitionTo(Running));
During(Running,
    When(DataCollected)
        .Then(f => { // activities and state transitions }),
    When(SimulationComplete)
        .Then(f => _logger.LogInformation("Network {NetworkId} shutting down.", f.Instance.CorrelationId))
        .TransitionTo(Final));

请注意 DataCollected 事件的处理方式 在 Initially 状态转换和 Initially 条件设置的状态转换中。