MassTransit 中的单身传奇
Singleton saga in MassTransit
我想做的是只创建一个 saga 实例,不被后续事件更新。因此,例如,如果我在 09:00 发送 EventA
,CorrelationId
等于 979158a2-dfa0-45f0-9107-8b8b8028bb9f
,然后该事件的传奇在 10:00 仍然存在,那么如果在 10:00 我发送了另一个 EventA
的实例,它与之前发送的 CorrelationId
相同,我不希望它覆盖之前的传奇状态,而是我希望这个事件是被MT忽略。我有点实现了,因为我的 saga 状态没有被覆盖(只有它的版本增加了),但问题是 MT 在新事件到达时抛出错误。例外是(+上面的一个日志条目):
[18:41:12 DBG] SAGA:App.Services.Communicators.Amazon.Dispatcher.Api.Sagas.GetOrdersState:0368a723-a819-450f-b6e5-9211d1d6a3a9 Dupe App.Services.Financial.Contracts.GetOrdersCommand
MongoDB.Driver.MongoWriteException: A write operation resulted in an error.
E11000 duplicate key error collection: Dispatcher.get.orders.states index: _id_ dup key: { _id: BinData(3, 23A7680319A80F45B6E59211D1D6A3A9) }
---> MongoDB.Driver.MongoBulkWriteException`1[App.Services.Communicators.Amazon.Dispatcher.Api.Sagas.GetOrdersState]: A bulk write operation resulted in one or more errors.
E11000 duplicate key error collection: Dispatcher.get.orders.states index: _id_ dup key: { _id: BinData(3, 23A7680319A80F45B6E59211D1D6A3A9) }
at MongoDB.Driver.MongoCollectionImpl`1.BulkWriteAsync(IClientSessionHandle session, IEnumerable`1 requests, BulkWriteOptions options, CancellationToken cancellationToken)
at MongoDB.Driver.MongoCollectionImpl`1.UsingImplicitSessionAsync[TResult](Func`2 funcAsync, CancellationToken cancellationToken)
at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
--- End of inner exception stack trace ---
at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
at MassTransit.MongoDbIntegration.Saga.Context.MongoDbSagaRepositoryContext`2.Insert(TSaga instance)
以及 saga 配置的重要部分:
Event(() => GetOrdersIntegrationEventReceived, e =>
e
.CorrelateById(x => x.Message.CorrelationId)
.SelectId(x => x.Message.CorrelationId)
.SetSagaFactory(ctx => new GetOrdersState
{
CorrelationId = ctx.Message.CorrelationId,
PartnerId = ctx.Message.PartnerId,
LastUpdatedBeforeFromRequest = ctx.Message.LastUpdatedBefore
})
.InsertOnInitial = true);
我正在尝试做的事情是否可行?
MassTransit 只会为每个 CorrelationId
创建一个传奇实例,因此您的推理是正确的。但是,您的方法有点偏离,可以进行一些调整。
比如你的活动配置:
Event(() => GetOrdersIntegrationEventReceived, e => e.CorrelateById(x => x.Message.CorrelationId));
就是这样,所有额外的东西就是您在调试日志中看到消息的原因。
然后,事件声明后:
During(Initial, Created,
When(GetOrdersIntegrationEventReceived)
.Then(context =>
{
context.Instance.PartnerId = context.Message.PartnerId,
context.Instance.LastUpdatedBeforeFromRequest = context.Message.LastUpdatedBefore
})
.TransitionTo(Created)
);
就是这样。
我想做的是只创建一个 saga 实例,不被后续事件更新。因此,例如,如果我在 09:00 发送 EventA
,CorrelationId
等于 979158a2-dfa0-45f0-9107-8b8b8028bb9f
,然后该事件的传奇在 10:00 仍然存在,那么如果在 10:00 我发送了另一个 EventA
的实例,它与之前发送的 CorrelationId
相同,我不希望它覆盖之前的传奇状态,而是我希望这个事件是被MT忽略。我有点实现了,因为我的 saga 状态没有被覆盖(只有它的版本增加了),但问题是 MT 在新事件到达时抛出错误。例外是(+上面的一个日志条目):
[18:41:12 DBG] SAGA:App.Services.Communicators.Amazon.Dispatcher.Api.Sagas.GetOrdersState:0368a723-a819-450f-b6e5-9211d1d6a3a9 Dupe App.Services.Financial.Contracts.GetOrdersCommand
MongoDB.Driver.MongoWriteException: A write operation resulted in an error.
E11000 duplicate key error collection: Dispatcher.get.orders.states index: _id_ dup key: { _id: BinData(3, 23A7680319A80F45B6E59211D1D6A3A9) }
---> MongoDB.Driver.MongoBulkWriteException`1[App.Services.Communicators.Amazon.Dispatcher.Api.Sagas.GetOrdersState]: A bulk write operation resulted in one or more errors.
E11000 duplicate key error collection: Dispatcher.get.orders.states index: _id_ dup key: { _id: BinData(3, 23A7680319A80F45B6E59211D1D6A3A9) }
at MongoDB.Driver.MongoCollectionImpl`1.BulkWriteAsync(IClientSessionHandle session, IEnumerable`1 requests, BulkWriteOptions options, CancellationToken cancellationToken)
at MongoDB.Driver.MongoCollectionImpl`1.UsingImplicitSessionAsync[TResult](Func`2 funcAsync, CancellationToken cancellationToken)
at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
--- End of inner exception stack trace ---
at MongoDB.Driver.MongoCollectionBase`1.InsertOneAsync(TDocument document, InsertOneOptions options, Func`3 bulkWriteAsync)
at MassTransit.MongoDbIntegration.Saga.Context.MongoDbSagaRepositoryContext`2.Insert(TSaga instance)
以及 saga 配置的重要部分:
Event(() => GetOrdersIntegrationEventReceived, e =>
e
.CorrelateById(x => x.Message.CorrelationId)
.SelectId(x => x.Message.CorrelationId)
.SetSagaFactory(ctx => new GetOrdersState
{
CorrelationId = ctx.Message.CorrelationId,
PartnerId = ctx.Message.PartnerId,
LastUpdatedBeforeFromRequest = ctx.Message.LastUpdatedBefore
})
.InsertOnInitial = true);
我正在尝试做的事情是否可行?
MassTransit 只会为每个 CorrelationId
创建一个传奇实例,因此您的推理是正确的。但是,您的方法有点偏离,可以进行一些调整。
比如你的活动配置:
Event(() => GetOrdersIntegrationEventReceived, e => e.CorrelateById(x => x.Message.CorrelationId));
就是这样,所有额外的东西就是您在调试日志中看到消息的原因。
然后,事件声明后:
During(Initial, Created,
When(GetOrdersIntegrationEventReceived)
.Then(context =>
{
context.Instance.PartnerId = context.Message.PartnerId,
context.Instance.LastUpdatedBeforeFromRequest = context.Message.LastUpdatedBefore
})
.TransitionTo(Created)
);
就是这样。