如何为 MassTransit sagas 配置重试
How to configure retry for MassTransit sagas
我遇到了 MassTransit sagas 的并发问题。
我目前正在使用此流程进行 POC:
- 一个线程产生 100 个事件,这些事件按顺序发布到 MassTransit。
- 当 saga 实例化时,它会向 MassTransit 发布另一个事件。
- 新事件由执行某些业务逻辑的消费者拾取,并将两个结果事件之一发布到 MassTransit。
- 第 3 步产生的事件触发 saga 中的状态更改
我有时会在步骤 4 中遇到这样的异常 Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded.
,状态更改不会持久化。
这是业务逻辑代码:
public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
public interface IExternalCheckRequest : CorrelatedBy<Guid> { }
public interface IExternalCheckOk : CorrelatedBy<Guid> { }
public interface IExternalCheckNotOk : CorrelatedBy<Guid> { }
public class MySaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public byte[] RowVersion { get; set; }
}
public class MyStateMachine : MassTransitStateMachine<MySaga>
{
public MyStateMachine()
{
InstanceState(instance => instance.CurrentState);
Initially(
When(InitialSagaEvent)
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingExternalCheck)
);
During(AwaitingExternalCheck,
Ignore(InitialSagaEvent),
When(ExternalCheckOk)
.TransitionTo(CheckedOk),
When(ExternalCheckNotOk)
.TransitionTo(CheckedNotOk)
);
During(CheckedOk,
When(InitialSagaEvent)
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingExternalCheck)
);
During(CheckedNotOk,
When(InitialSagaEvent)
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingExternalCheck)
);
}
public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
public Event<IExternalCheckOk> ExternalCheckOk { get; private set; }
public Event<IExternalCheckNotOk> ExternalCheckNotOk { get; private set; }
public State AwaitingExternalCheck { get; private set; }
public State CheckedOk { get; private set; }
public State CheckedNotOk { get; private set; }
}
public class ExternalCheckRequestConsumer : IConsumer<IExternalCheckRequest>
{
private readonly IExternalChecker externalChecker;
public ExternalCheckRequestConsumer(IExternalChecker externalChecker)
{
this.externalChecker = externalChecker;
}
public async Task Consume(ConsumeContext<IExternalCheckRequest> context)
{
var ok = await externalChecker.PerformCheck(context.Message, context.CancellationToken);
if (ok)
{
await context.Publish<IExternalCheckOk>(new { context.Message.CorrelationId }, context.CancellationToken);
}
else
{
await context.Publish<IExternalCheckNotOk>(new { context.Message.CorrelationId }, context.CancellationToken);
}
}
}
public interface IExternalChecker
{
Task<bool> PerformCheck(IExternalCheckRequest request, CancellationToken cancellationToken);
}
public class Publisher
{
private readonly IPublishEndpoint publishEndpoint;
public Publisher(IPublishEndpoint publishEndpoint)
{
this.publishEndpoint = publishEndpoint;
}
public async Task Publish(Guid correlationId, CancellationToken cancellationToken)
{
await publishEndpoint.Publish<IInitialSagaEvent>(new { CorrelationId = correlationId }, cancellationToken);
}
}
这里是配置代码
public class MySagaDbContext : SagaDbContext
{
public MySagaDbContext(DbContextOptions<MySagaDbContext> options) : base(options) { }
protected override IEnumerable<ISagaClassMap> Configurations
{
get
{
yield return new MySagaClassMap();
}
}
}
public class MySagaClassMap : SagaClassMap<MySaga>
{
protected override void Configure(EntityTypeBuilder<MySaga> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(128);
entity.Property(x => x.RowVersion).IsRowVersion();
}
}
public class ExternalCheckRequestConsumerDefinition : ConsumerDefinition<ExternalCheckRequestConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
endpointConfigurator.UseRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
}
public class Program
{
public static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddDbContext<DbContext, MySagaDbContext>((provider, builder)
=> builder.UseSqlServer(connectionString, m =>
{
m.MigrationsAssembly(typeof(MySagaDbContext).Assembly.GetName().Name);
m.MigrationsHistoryTable($"__EFMigrationsHistory_Sagas");
}));
services.AddMassTransit(configureMassTransit =>
{
configureMassTransit.AddConsumer<ExternalCheckRequestConsumer, ExternalCheckRequestConsumerDefinition>();
configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MySagaDbContext>();
});
configureMassTransit.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(true));
configureMassTransit.UsingActiveMq((context, config) =>
{
config.Host("artemis", 61616, configureHost =>
{
configureHost.Username("admin");
configureHost.Password("admin");
});
config.UseInMemoryOutbox(); // ref https://masstransit-project.com/articles/outbox.html#the-in-memory-outbox
config.EnableArtemisCompatibility();
config.ConfigureEndpoints(context);
});
});
var serviceProvider = services.BuildServiceProvider();
var busControl = serviceProvider.GetRequiredService<IBusControl>();
await busControl.StartAsync();
await RunPoc(serviceProvider);
}
private static async Task RunPoc(IServiceProvider serviceProvider)
{
await Task.CompletedTask;
}
static string connectionString = string.Empty;
}
我的猜测是我需要在正确的点进入 UseRetry,所以我尝试像这样使用 UseRetry 配置 AddSagaStateMachine:
configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
configure =>
{
configure.UseRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
})
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MySagaDbContext>();
});
但是在 AddSagaStateMachine 中使用这个 UseRetry 没有任何效果,我只是得到大量这样的异常:
fail: MassTransit.ReceiveTransport[0]
R - FAULT activemq://artemis:61616/XXXX
System.ArgumentException: THe message could not be retrieved: IInitialSagaEvent(Parameter 'context')
at MassTransit.Saga.Pipeline.Pipes.SagaMergePipe`2.Send(SagaConsumeContext`1 context)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass5_0`1.<< Send > b__1 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass8_0.<< WithinTransaction > g__Create | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
我正在使用 .Net 6 并尝试过 MassTransit v 7.3.1 和 v 8.0.0-develop.391,但两者具有相同的行为。
我试过将消息定义为接口并将它们发布为匿名 类 和实际实现,还尝试将消息定义为 类,但没有成功。
我希望我只是遗漏了一些小的配置细节,但我没有想法,所以非常感谢任何帮助。
SagaDefinition
中的正确配置如下所示。注意使用 UseMessageRetry
,而不是 UseRetry
。
public class ExternalCheckRequestConsumerDefinition :
ConsumerDefinition<ExternalCheckRequestConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
endpointConfigurator.UseMessageRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
}
更新
上面的 Consumer 定义没有被 saga 使用。您需要创建一个 Saga 定义,并在添加传奇时指定它,以便重试应用于传奇。这与添加传奇时内联配置相同:
.AddSagaStateMachine<MyStateMachine, MySaga, MySagaDefinition>(
此外,在您的状态机中,替换过于嘈杂的:
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
有:
.PublishAsync(context => context.Init<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
这是我使用的 .AddSagaStateMachine,参考其他答案中的 Chris Pattersons 解决方案。
configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
configure =>
{
configure.UseMessageRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
})
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MySagaDbContext>();
});
我遇到了 MassTransit sagas 的并发问题。
我目前正在使用此流程进行 POC:
- 一个线程产生 100 个事件,这些事件按顺序发布到 MassTransit。
- 当 saga 实例化时,它会向 MassTransit 发布另一个事件。
- 新事件由执行某些业务逻辑的消费者拾取,并将两个结果事件之一发布到 MassTransit。
- 第 3 步产生的事件触发 saga 中的状态更改
我有时会在步骤 4 中遇到这样的异常 Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded.
,状态更改不会持久化。
这是业务逻辑代码:
public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
public interface IExternalCheckRequest : CorrelatedBy<Guid> { }
public interface IExternalCheckOk : CorrelatedBy<Guid> { }
public interface IExternalCheckNotOk : CorrelatedBy<Guid> { }
public class MySaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public byte[] RowVersion { get; set; }
}
public class MyStateMachine : MassTransitStateMachine<MySaga>
{
public MyStateMachine()
{
InstanceState(instance => instance.CurrentState);
Initially(
When(InitialSagaEvent)
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingExternalCheck)
);
During(AwaitingExternalCheck,
Ignore(InitialSagaEvent),
When(ExternalCheckOk)
.TransitionTo(CheckedOk),
When(ExternalCheckNotOk)
.TransitionTo(CheckedNotOk)
);
During(CheckedOk,
When(InitialSagaEvent)
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingExternalCheck)
);
During(CheckedNotOk,
When(InitialSagaEvent)
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingExternalCheck)
);
}
public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
public Event<IExternalCheckOk> ExternalCheckOk { get; private set; }
public Event<IExternalCheckNotOk> ExternalCheckNotOk { get; private set; }
public State AwaitingExternalCheck { get; private set; }
public State CheckedOk { get; private set; }
public State CheckedNotOk { get; private set; }
}
public class ExternalCheckRequestConsumer : IConsumer<IExternalCheckRequest>
{
private readonly IExternalChecker externalChecker;
public ExternalCheckRequestConsumer(IExternalChecker externalChecker)
{
this.externalChecker = externalChecker;
}
public async Task Consume(ConsumeContext<IExternalCheckRequest> context)
{
var ok = await externalChecker.PerformCheck(context.Message, context.CancellationToken);
if (ok)
{
await context.Publish<IExternalCheckOk>(new { context.Message.CorrelationId }, context.CancellationToken);
}
else
{
await context.Publish<IExternalCheckNotOk>(new { context.Message.CorrelationId }, context.CancellationToken);
}
}
}
public interface IExternalChecker
{
Task<bool> PerformCheck(IExternalCheckRequest request, CancellationToken cancellationToken);
}
public class Publisher
{
private readonly IPublishEndpoint publishEndpoint;
public Publisher(IPublishEndpoint publishEndpoint)
{
this.publishEndpoint = publishEndpoint;
}
public async Task Publish(Guid correlationId, CancellationToken cancellationToken)
{
await publishEndpoint.Publish<IInitialSagaEvent>(new { CorrelationId = correlationId }, cancellationToken);
}
}
这里是配置代码
public class MySagaDbContext : SagaDbContext
{
public MySagaDbContext(DbContextOptions<MySagaDbContext> options) : base(options) { }
protected override IEnumerable<ISagaClassMap> Configurations
{
get
{
yield return new MySagaClassMap();
}
}
}
public class MySagaClassMap : SagaClassMap<MySaga>
{
protected override void Configure(EntityTypeBuilder<MySaga> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(128);
entity.Property(x => x.RowVersion).IsRowVersion();
}
}
public class ExternalCheckRequestConsumerDefinition : ConsumerDefinition<ExternalCheckRequestConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
endpointConfigurator.UseRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
}
public class Program
{
public static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddDbContext<DbContext, MySagaDbContext>((provider, builder)
=> builder.UseSqlServer(connectionString, m =>
{
m.MigrationsAssembly(typeof(MySagaDbContext).Assembly.GetName().Name);
m.MigrationsHistoryTable($"__EFMigrationsHistory_Sagas");
}));
services.AddMassTransit(configureMassTransit =>
{
configureMassTransit.AddConsumer<ExternalCheckRequestConsumer, ExternalCheckRequestConsumerDefinition>();
configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MySagaDbContext>();
});
configureMassTransit.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(true));
configureMassTransit.UsingActiveMq((context, config) =>
{
config.Host("artemis", 61616, configureHost =>
{
configureHost.Username("admin");
configureHost.Password("admin");
});
config.UseInMemoryOutbox(); // ref https://masstransit-project.com/articles/outbox.html#the-in-memory-outbox
config.EnableArtemisCompatibility();
config.ConfigureEndpoints(context);
});
});
var serviceProvider = services.BuildServiceProvider();
var busControl = serviceProvider.GetRequiredService<IBusControl>();
await busControl.StartAsync();
await RunPoc(serviceProvider);
}
private static async Task RunPoc(IServiceProvider serviceProvider)
{
await Task.CompletedTask;
}
static string connectionString = string.Empty;
}
我的猜测是我需要在正确的点进入 UseRetry,所以我尝试像这样使用 UseRetry 配置 AddSagaStateMachine:
configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
configure =>
{
configure.UseRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
})
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MySagaDbContext>();
});
但是在 AddSagaStateMachine 中使用这个 UseRetry 没有任何效果,我只是得到大量这样的异常:
fail: MassTransit.ReceiveTransport[0]
R - FAULT activemq://artemis:61616/XXXX
System.ArgumentException: THe message could not be retrieved: IInitialSagaEvent(Parameter 'context')
at MassTransit.Saga.Pipeline.Pipes.SagaMergePipe`2.Send(SagaConsumeContext`1 context)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass5_0`1.<< Send > b__1 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.<> c__DisplayClass8_0.<< WithinTransaction > g__Create | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.WithinTransaction[T](DbContext context, CancellationToken cancellationToken, Func`1 callback)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.EntityFrameworkCoreIntegration.Saga.Context.EntityFrameworkSagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory`1.<> c__DisplayClass6_0`1.<< Send > g__CreateScope | 0 > d.MoveNext()
-- - End of stack trace from previous location ---
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
我正在使用 .Net 6 并尝试过 MassTransit v 7.3.1 和 v 8.0.0-develop.391,但两者具有相同的行为。
我试过将消息定义为接口并将它们发布为匿名 类 和实际实现,还尝试将消息定义为 类,但没有成功。
我希望我只是遗漏了一些小的配置细节,但我没有想法,所以非常感谢任何帮助。
SagaDefinition
中的正确配置如下所示。注意使用 UseMessageRetry
,而不是 UseRetry
。
public class ExternalCheckRequestConsumerDefinition :
ConsumerDefinition<ExternalCheckRequestConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ExternalCheckRequestConsumer> consumerConfigurator) =>
endpointConfigurator.UseMessageRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
}
更新
上面的 Consumer 定义没有被 saga 使用。您需要创建一个 Saga 定义,并在添加传奇时指定它,以便重试应用于传奇。这与添加传奇时内联配置相同:
.AddSagaStateMachine<MyStateMachine, MySaga, MySagaDefinition>(
此外,在您的状态机中,替换过于嘈杂的:
.ThenAsync(context => context.GetPayload<ConsumeContext>().Publish<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
有:
.PublishAsync(context => context.Init<IExternalCheckRequest>(new { context.Instance.CorrelationId }))
这是我使用的 .AddSagaStateMachine,参考其他答案中的 Chris Pattersons 解决方案。
configureMassTransit.AddSagaStateMachine<MyStateMachine, MySaga>(
configure =>
{
configure.UseMessageRetry(r =>
{
r.Handle<DbUpdateConcurrencyException>();
// This is the SQLServer error code for duplicate key
r.Handle<DbUpdateException>(y => y.InnerException is SqlException e && e.Number == 2627);
r.Interval(5, TimeSpan.FromMilliseconds(100));
});
})
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MySagaDbContext>();
});