如何为 MassTransit sagas 配置重试

How to configure retry for MassTransit sagas

我遇到了 MassTransit sagas 的并发问题。

我目前正在使用此流程进行 POC:

  1. 一个线程产生 100 个事件,这些事件按顺序发布到 MassTransit。
  2. 当 saga 实例化时,它会向 MassTransit 发布另一个事件。
  3. 新事件由执行某些业务逻辑的消费者拾取,并将两个结果事件之一发布到 MassTransit。
  4. 第 3 步产生的事件触发 saga 中的状态更改

我有时会在步骤 4 中遇到这样的异常 Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded.,状态更改不会持久化。

这是业务逻辑代码:

public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
public interface IExternalCheckRequest : CorrelatedBy<Guid> { }
public interface IExternalCheckOk : CorrelatedBy<Guid> { }
public interface IExternalCheckNotOk : CorrelatedBy<Guid> { }

public class MySaga : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public byte[] RowVersion { get; set; }
}

public class MyStateMachine : MassTransitStateMachine<MySaga>
{
    public MyStateMachine()
    {
        InstanceState(instance => instance.CurrentState);
        Initially(
            When(InitialSagaEvent)
                .ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
                .TransitionTo(AwaitingExternalCheck)
        );

        During(AwaitingExternalCheck,
            Ignore(InitialSagaEvent),
            When(ExternalCheckOk)
                .TransitionTo(CheckedOk),
            When(ExternalCheckNotOk)
                .TransitionTo(CheckedNotOk)
        );

        During(CheckedOk,
            When(InitialSagaEvent)
                .ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
                .TransitionTo(AwaitingExternalCheck)
        );

        During(CheckedNotOk,
            When(InitialSagaEvent)
                .ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
                .TransitionTo(AwaitingExternalCheck)
        );
    }
    public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
    public Event<IExternalCheckOk> ExternalCheckOk { get; private set; }
    public Event<IExternalCheckNotOk> ExternalCheckNotOk { get; private set; }
    public State AwaitingExternalCheck { get; private set; }
    public State CheckedOk { get; private set; }
    public State CheckedNotOk { get; private set; }
}

public class ExternalCheckRequestConsumer : IConsumer<IExternalCheckRequest>
{
    private readonly IExternalChecker externalChecker;

    public ExternalCheckRequestConsumer(IExternalChecker externalChecker)
    {
        this.externalChecker = externalChecker;
    }

    public async Task Consume(ConsumeContext<IExternalCheckRequest> context)
    {
        var ok = await externalChecker.PerformCheck(context.Message, context.CancellationToken);
        if (ok)
        {
            await context.Publish<IExternalCheckOk>(new { context.Message.CorrelationId }, context.CancellationToken);
        }
        else
        {
            await context.Publish<IExternalCheckNotOk>(new { context.Message.CorrelationId }, context.CancellationToken);
        }
    }
}

public interface IExternalChecker
{
    Task<bool> PerformCheck(IExternalCheckRequest request, CancellationToken cancellationToken);
}

public class Publisher
{
    private readonly IPublishEndpoint publishEndpoint;

    public Publisher(IPublishEndpoint publishEndpoint)
    {
        this.publishEndpoint = publishEndpoint;
    }

    public async Task Publish(Guid correlationId, CancellationToken cancellationToken)
    {
        await publishEndpoint.Publish<IInitialSagaEvent>(new { CorrelationId = correlationId }, cancellationToken);
    }
}

这里是配置代码

public class MySagaDbContext : SagaDbContext
{
    public MySagaDbContext(DbContextOptions<MySagaDbContext> options) : base(options) { }

    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get
        {
            yield return new MySagaClassMap();
        }
    }
}

public class MySagaClassMap : SagaClassMap<MySaga>
{
    protected override void Configure(EntityTypeBuilder<MySaga> entity, ModelBuilder model)
    {
        entity.Property(x => x.CurrentState).HasMaxLength(128);
        entity.Property(x => x.RowVersion).IsRowVersion();
    }
}

public class ExternalCheckRequestConsumerDefinition : ConsumerDefinition<ExternalCheckRequestConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
        endpointConfigurator.UseRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
}

public class Program
{
    public static async Task Main(string[] args)
    {
        var services = new ServiceCollection();
        services.AddDbContext<DbContext, MySagaDbContext>((provider, builder)
                => builder.UseSqlServer(connectionString, m =>
                {
                    m.MigrationsAssembly(typeof(MySagaDbContext).Assembly.GetName().Name);
                    m.MigrationsHistoryTable($"__EFMigrationsHistory_Sagas");
                }));
        services.AddMassTransit(configureMassTransit =>
        {
            configureMassTransit.AddConsumer<ExternalCheckRequestConsumer, ExternalCheckRequestConsumerDefinition>();
            configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Optimistic;
                    r.ExistingDbContext<MySagaDbContext>();
                });
            configureMassTransit.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(true));
            configureMassTransit.UsingActiveMq((context, config) =>
            {
                config.Host("artemis", 61616, configureHost =>
                {
                    configureHost.Username("admin");
                    configureHost.Password("admin");
                });

                config.UseInMemoryOutbox(); // ref https://masstransit-project.com/articles/outbox.html#the-in-memory-outbox
                config.EnableArtemisCompatibility();
                config.ConfigureEndpoints(context);
            });
        });
        var serviceProvider = services.BuildServiceProvider();
        var busControl = serviceProvider.GetRequiredService<IBusControl>();
        await busControl.StartAsync();
        await RunPoc(serviceProvider);
    }

    private static async Task RunPoc(IServiceProvider serviceProvider)
    {
        await Task.CompletedTask;
    }
    static string connectionString = string.Empty;
}

我的猜测是我需要在正确的点进入 UseRetry,所以我尝试像这样使用 UseRetry 配置 AddSagaStateMachine:

configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
    configure =>
    {
        configure.UseRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
    })
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Optimistic;
        r.ExistingDbContext<MySagaDbContext>();
    });

但是在 AddSagaStateMachine 中使用这个 UseRetry 没有任何效果,我只是得到大量这样的异常:

fail: MassTransit.ReceiveTransport[0]
R - FAULT activemq://artemis:61616/XXXX
System.ArgumentException: THe message could not be retrieved: IInitialSagaEvent(Parameter 'context')
at MassTransit.Saga.Pipeline.Pipes.SagaMergePipe`2.Send(SagaConsumeContext`1 context)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass5_0`1.<< Send > b__1 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass8_0.<< WithinTransaction > g__Create | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)

我正在使用 .Net 6 并尝试过 MassTransit v 7.3.1 和 v 8.0.0-develop.391,但两者具有相同的行为。

我试过将消息定义为接口并将它们发布为匿名 类 和实际实现,还尝试将消息定义为 类,但没有成功。

我希望我只是遗漏了一些小的配置细节,但我没有想法,所以非常感谢任何帮助。

SagaDefinition 中的正确配置如下所示。注意使用 UseMessageRetry,而不是 UseRetry

public class ExternalCheckRequestConsumerDefinition : 
    ConsumerDefinition<ExternalCheckRequestConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, 
        IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
        endpointConfigurator.UseMessageRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
}

更新

上面的 Consumer 定义没有被 saga 使用。您需要创建一个 Saga 定义,并在添加传奇时指定它,以便重试应用于传奇。这与添加传奇时内联配置相同:

.AddSagaStateMachine<MyStateMachine, MySaga, MySagaDefinition>(

此外,在您的状态机中,替换过于嘈杂的:

.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))

有:

.PublishAsync(context => context.Init<IExternalCheckRequest>(new { context.Instance.CorrelationId }))

这是我使用的 .AddSagaStateMachine,参考其他答案中的 Chris Pattersons 解决方案。

configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
    configure =>
    {
        configure.UseMessageRetry(r =>
        {
            r.Handle<DbUpdateConcurrencyException>();
            // This is the SQLServer error code for duplicate key
            r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
            r.Interval(5, TimeSpan.FromMilliseconds(100));
        });
    })
    .EntityFrameworkRepository(r =>
    {
        r.ConcurrencyMode = ConcurrencyMode.Optimistic;
        r.ExistingDbContext<MySagaDbContext>();
    });