MassTransit 中间件:OnMissingInstance 等同于没有 Automatonymous 的 Saga

MassTransit Middleware: OnMissingInstance equivalent for Saga without Automatonymous

我试图通过遵循与 NServiceBus 中 Sagas 实现方式类似的模式,在不使用自动状态机(我开始使用它但发现很难正确进行单元测试)的情况下手动启动 Saga。

但是,我遇到了一个问题,即在初始消息创建实例之前,一条消息就进入了 saga,这会导致 MassTransit 默默地吞下消息,而不会抛出异常或将消息移至错误排队。

在尝试找出解决这个问题的同时,很多人建议使用 OnMissingInstance 来错误消息,然后依靠重试框架有效地延迟它,直到传奇被正确初始化. See Here.

我想知道是否有一种方法可以在不使用 Automatonymous 框架的情况下做到这一点,最有可能通过利用一些中间件来先发制人地检查以确保消息存在 Saga(抛出异常,如果不存在则稍后重试)在它尝试处理它之前?如果不是,这听起来像是 useful/is 可能的事情吗?

更多信息:

-使用 Azure 服务总线
-MongoDB 用于传奇持久性。

一些代码简化了我要实现的代码片段:

ec.Saga(new MongoDbSagaRepository<CodeProviderSaga>(sagaDatabase, new MongoDbSagaConsumeContextFactory(), nameof(CodeProviderSaga)), config =>
{
    config.UseApplicationInsights(telemetryClient);
    config.UseRetry(retryConfig => retryConfig.Exponential(10, TimeSpan.FromMilliseconds(500), TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(1)));
});
public class CodeProviderSaga :
    InitiatedBy<FirstEvent>,
    Orchestrates<SecondEvent>,
    IVersionedSaga
{
    [BsonId]
    public Guid CorrelationId { get; set; }
    public int Version { get; set; }
    public bool SecondEventRecieved { get; set; }
    public DateTimeOffset? LastUpdated { get; set; }

    public Task Consume(ConsumeContext<FirstEvent> context)
    {
        this.LastUpdated = DateTimeOffset.UtcNow;
        return Task.Completed;
    }

    // An exception should be thrown if this is received first so it can be retried but it is silently dropped atm
    public Task Consume(ConsumeContext<SecondEvent> context)
    {
        this.SecondEventRecieved = true;
        this.LastUpdated = DateTimeOffset.UtcNow;

        return Task.Completed;
    }
}

为了解决这个问题,刚刚向 develop 分支添加了一个提交,它会为没有在 Orchestrates<T> 接口中指定的匹配 saga 的消息抛出异常。