MassTransit 中间件:OnMissingInstance 等同于没有 Automatonymous 的 Saga
MassTransit Middleware: OnMissingInstance equivalent for Saga without Automatonymous
我试图通过遵循与 NServiceBus 中 Sagas 实现方式类似的模式,在不使用自动状态机(我开始使用它但发现很难正确进行单元测试)的情况下手动启动 Saga。
但是,我遇到了一个问题,即在初始消息创建实例之前,一条消息就进入了 saga,这会导致 MassTransit 默默地吞下消息,而不会抛出异常或将消息移至错误排队。
在尝试找出解决这个问题的同时,很多人建议使用 OnMissingInstance
来错误消息,然后依靠重试框架有效地延迟它,直到传奇被正确初始化.
See Here.
我想知道是否有一种方法可以在不使用 Automatonymous 框架的情况下做到这一点,最有可能通过利用一些中间件来先发制人地检查以确保消息存在 Saga(抛出异常,如果不存在则稍后重试)在它尝试处理它之前?如果不是,这听起来像是 useful/is 可能的事情吗?
更多信息:
-使用 Azure 服务总线
-MongoDB 用于传奇持久性。
一些代码简化了我要实现的代码片段:
ec.Saga(new MongoDbSagaRepository<CodeProviderSaga>(sagaDatabase, new MongoDbSagaConsumeContextFactory(), nameof(CodeProviderSaga)), config =>
{
config.UseApplicationInsights(telemetryClient);
config.UseRetry(retryConfig => retryConfig.Exponential(10, TimeSpan.FromMilliseconds(500), TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(1)));
});
public class CodeProviderSaga :
InitiatedBy<FirstEvent>,
Orchestrates<SecondEvent>,
IVersionedSaga
{
[BsonId]
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public bool SecondEventRecieved { get; set; }
public DateTimeOffset? LastUpdated { get; set; }
public Task Consume(ConsumeContext<FirstEvent> context)
{
this.LastUpdated = DateTimeOffset.UtcNow;
return Task.Completed;
}
// An exception should be thrown if this is received first so it can be retried but it is silently dropped atm
public Task Consume(ConsumeContext<SecondEvent> context)
{
this.SecondEventRecieved = true;
this.LastUpdated = DateTimeOffset.UtcNow;
return Task.Completed;
}
}
为了解决这个问题,刚刚向 develop 分支添加了一个提交,它会为没有在 Orchestrates<T>
接口中指定的匹配 saga 的消息抛出异常。
我试图通过遵循与 NServiceBus 中 Sagas 实现方式类似的模式,在不使用自动状态机(我开始使用它但发现很难正确进行单元测试)的情况下手动启动 Saga。
但是,我遇到了一个问题,即在初始消息创建实例之前,一条消息就进入了 saga,这会导致 MassTransit 默默地吞下消息,而不会抛出异常或将消息移至错误排队。
在尝试找出解决这个问题的同时,很多人建议使用 OnMissingInstance
来错误消息,然后依靠重试框架有效地延迟它,直到传奇被正确初始化.
See Here.
我想知道是否有一种方法可以在不使用 Automatonymous 框架的情况下做到这一点,最有可能通过利用一些中间件来先发制人地检查以确保消息存在 Saga(抛出异常,如果不存在则稍后重试)在它尝试处理它之前?如果不是,这听起来像是 useful/is 可能的事情吗?
更多信息:
-使用 Azure 服务总线
-MongoDB 用于传奇持久性。
一些代码简化了我要实现的代码片段:
ec.Saga(new MongoDbSagaRepository<CodeProviderSaga>(sagaDatabase, new MongoDbSagaConsumeContextFactory(), nameof(CodeProviderSaga)), config =>
{
config.UseApplicationInsights(telemetryClient);
config.UseRetry(retryConfig => retryConfig.Exponential(10, TimeSpan.FromMilliseconds(500), TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(1)));
});
public class CodeProviderSaga :
InitiatedBy<FirstEvent>,
Orchestrates<SecondEvent>,
IVersionedSaga
{
[BsonId]
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public bool SecondEventRecieved { get; set; }
public DateTimeOffset? LastUpdated { get; set; }
public Task Consume(ConsumeContext<FirstEvent> context)
{
this.LastUpdated = DateTimeOffset.UtcNow;
return Task.Completed;
}
// An exception should be thrown if this is received first so it can be retried but it is silently dropped atm
public Task Consume(ConsumeContext<SecondEvent> context)
{
this.SecondEventRecieved = true;
this.LastUpdated = DateTimeOffset.UtcNow;
return Task.Completed;
}
}
为了解决这个问题,刚刚向 develop 分支添加了一个提交,它会为没有在 Orchestrates<T>
接口中指定的匹配 saga 的消息抛出异常。