如何在 MassTransit 中配置多次重试策略
How to configure multiple retry policies in MassTransit
我想要针对特定消费者的两个单独的重试策略。一个用于 HttpRequestException
和 SocketException
,另一个用于自定义 DatabaseException
和未处理的 SqlException
。我想这样做是因为我想为两者设置单独的指数重试间隔。
我有以下配置:
cfg.ReceiveEndpoint(
host,
"ExampleQueueName",
ec =>
{
ec.Consumer<ExampleConsumer>(context);
ec.EnablePriority(5);
ec.UseRetry(retryConfig =>
{
// exponential backup in minutes
retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
retryConfig.Handle<HttpRequestException>(x => x.IsTransient());
retryConfig.Handle<SocketException>(x => x.IsTransient());
});
ec.UseRetry(retryConfig =>
{
// exponential backup in seconds
retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
retryConfig.Handle<DatabaseException>(x => x.IsTransient());
retryConfig.Handle<SqlException>(x => x.IsTransient());
});
});
目前只有第二种。第一个似乎被覆盖了。
我也试过像这样配置二级重试:
cfg.ReceiveEndpoint(
host,
"QueueName",
ec =>
{
ec.Consumer<ExampleConsumer>(context, factory =>
{
factory.UseRetry(retryConfig =>
{
// exponential backup in seconds for sql and concurrency exceptions
retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
retryConfig.Handle<DatabaseException>(x => x.IsTransient());
retryConfig.Handle<SqlException>(x => x.IsTransient());
});
});
ec.EnablePriority(5);
ec.UseRetry(retryConfig =>
{
// exponential backup in minutes for http request exceptions
retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
retryConfig.Handle<DatabaseException>(x => x.IsTransient());
retryConfig.Handle<SqlException>(x => x.IsTransient());
});
});
但这似乎也不起作用。有人知道如何为不同的异常类型应用不同的重试间隔吗?
MassTransit 将一切构建为管道,过滤器的顺序很重要。重写上面的示例应该可以解决问题(我所做的只是将消费者移到最后)。
cfg.ReceiveEndpoint("ExampleQueueName", ec =>
{
ec.EnablePriority(5);
ec.UseMessageRetry(r =>
{
r.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
r.Handle<HttpRequestException>(x => x.IsTransient());
r.Handle<SocketException>(x => x.IsTransient());
});
ec.UseMessageRetry(r =>
{
r.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
r.Handle<DatabaseException>(x => x.IsTransient());
r.Handle<SqlException>(x => x.IsTransient());
});
ec.Consumer<ExampleConsumer>(context);
});
我想要针对特定消费者的两个单独的重试策略。一个用于 HttpRequestException
和 SocketException
,另一个用于自定义 DatabaseException
和未处理的 SqlException
。我想这样做是因为我想为两者设置单独的指数重试间隔。
我有以下配置:
cfg.ReceiveEndpoint(
host,
"ExampleQueueName",
ec =>
{
ec.Consumer<ExampleConsumer>(context);
ec.EnablePriority(5);
ec.UseRetry(retryConfig =>
{
// exponential backup in minutes
retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
retryConfig.Handle<HttpRequestException>(x => x.IsTransient());
retryConfig.Handle<SocketException>(x => x.IsTransient());
});
ec.UseRetry(retryConfig =>
{
// exponential backup in seconds
retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
retryConfig.Handle<DatabaseException>(x => x.IsTransient());
retryConfig.Handle<SqlException>(x => x.IsTransient());
});
});
目前只有第二种。第一个似乎被覆盖了。
我也试过像这样配置二级重试:
cfg.ReceiveEndpoint(
host,
"QueueName",
ec =>
{
ec.Consumer<ExampleConsumer>(context, factory =>
{
factory.UseRetry(retryConfig =>
{
// exponential backup in seconds for sql and concurrency exceptions
retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
retryConfig.Handle<DatabaseException>(x => x.IsTransient());
retryConfig.Handle<SqlException>(x => x.IsTransient());
});
});
ec.EnablePriority(5);
ec.UseRetry(retryConfig =>
{
// exponential backup in minutes for http request exceptions
retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
retryConfig.Handle<DatabaseException>(x => x.IsTransient());
retryConfig.Handle<SqlException>(x => x.IsTransient());
});
});
但这似乎也不起作用。有人知道如何为不同的异常类型应用不同的重试间隔吗?
MassTransit 将一切构建为管道,过滤器的顺序很重要。重写上面的示例应该可以解决问题(我所做的只是将消费者移到最后)。
cfg.ReceiveEndpoint("ExampleQueueName", ec =>
{
ec.EnablePriority(5);
ec.UseMessageRetry(r =>
{
r.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
r.Handle<HttpRequestException>(x => x.IsTransient());
r.Handle<SocketException>(x => x.IsTransient());
});
ec.UseMessageRetry(r =>
{
r.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
r.Handle<DatabaseException>(x => x.IsTransient());
r.Handle<SqlException>(x => x.IsTransient());
});
ec.Consumer<ExampleConsumer>(context);
});