MassTransit 中的单身传奇

Singleton saga in MassTransit

我想做的是只创建一个 saga 实例,不被后续事件更新。因此,例如,如果我在 09:00 发送 EventACorrelationId 等于 979158a2-dfa0-45f0-9107-8b8b8028bb9f,然后该事件的传奇在 10:00 仍然存在,那么如果在 10:00 我发送了另一个 EventA 的实例,它与之前发送的 CorrelationId 相同,我不希望它覆盖之前的传奇状态,而是我希望这个事件是被MT忽略。我有点实现了,因为我的 saga 状态没有被覆盖(只有它的版本增加了),但问题是 MT 在新事件到达时抛出错误。例外是(+上面的一个日志条目):

[18:41:12 DBG] SAGA:App.Services.Communicators.Amazon.Dispatcher.Api.Sagas.GetOrdersState:0368a723-a819-450f-b6e5-9211d1d6a3a9 Dupe App.Services.Financial.Contracts.GetOrdersCommand
MongoDB.Driver.MongoWriteException: A write operation resulted in an error.
  E11000 duplicate key error collection: Dispatcher.get.orders.states index: _id_ dup key: { _id: BinData(3, 23A7680319A80F45B6E59211D1D6A3A9) }
 ---> MongoDB.Driver.MongoBulkWriteException`1[App.Services.Communicators.Amazon.Dispatcher.Api.Sagas.GetOrdersState]: A bulk write operation resulted in one or more errors.
  E11000 duplicate key error collection: Dispatcher.get.orders.states index: _id_ dup key: { _id: BinData(3, 23A7680319A80F45B6E59211D1D6A3A9) }
   at MongoDB.Driver.MongoCollectionImpl`1.BulkWriteAsync(IClientSessionHandle session, IEnumerable`1 requests, BulkWriteOptions options, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.UsingImplicitSessionAsync[TResult](Func`2 funcAsync, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
   --- End of inner exception stack trace ---
   at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
   at MassTransit.MongoDbIntegration.Saga.Context.MongoDbSagaRepositoryContext`2.Insert(TSaga instance)

以及 saga 配置的重要部分:

Event(() => GetOrdersIntegrationEventReceived, e =>
    e
        .CorrelateById(x => x.Message.CorrelationId)
        .SelectId(x => x.Message.CorrelationId)
        .SetSagaFactory(ctx => new GetOrdersState
        {
            CorrelationId = ctx.Message.CorrelationId,
            PartnerId = ctx.Message.PartnerId,
            LastUpdatedBeforeFromRequest = ctx.Message.LastUpdatedBefore
        })
        .InsertOnInitial = true);

我正在尝试做的事情是否可行?

MassTransit 只会为每个 CorrelationId 创建一个传奇实例,因此您的推理是正确的。但是,您的方法有点偏离,可以进行一些调整。

比如你的活动配置:

Event(() => GetOrdersIntegrationEventReceived, e => e.CorrelateById(x => x.Message.CorrelationId));

就是这样,所有额外的东西就是您在调试日志中看到消息的原因。

然后,事件声明后:

During(Initial, Created,
    When(GetOrdersIntegrationEventReceived)
        .Then(context =>
        {
            context.Instance.PartnerId = context.Message.PartnerId,
            context.Instance.LastUpdatedBeforeFromRequest = context.Message.LastUpdatedBefore
        })
        .TransitionTo(Created)
);

就是这样。