Azure 服务总线:为处理失败的消息实施指数重试策略的最佳方式
Azure Service Bus: Best way to implement exponential retry policy for failed to process messages
我不断地以速览模式接收消息,如果处理失败(不是投递)则放弃它们。但是,该消息会立即再次可用并再次接收以进行处理。它再次快速失败,并且在最大交付后它是死信。
有没有办法配置topic/subscription在消息被放弃后等待释放?最好以指数方式。
当然,我也愿意通过代码征求建议。
无法在服务总线配置中设置指数退避。我遇到过同样的问题,并做了以下操作:
- 使消息出队,并将消息标记为已接收。
- 在 try/catch 块内执行处理。如果有异常,则在未来的某个时间点将新消息排入队列。
我们已将我们的服务总线消息队列负载包装在一个 class 中,它指定了传递尝试的次数。我们将交付尝试次数乘以一个常数,然后将该数字添加到当前日期时间以用于将来计划的交付。在超过我们想要尝试的投递尝试次数后,我们明确地死信该消息。
编辑 2020 年 7 月 17 日
考虑使用 Azure 持久任务框架,它内置了可自定义的重试策略。
我也在研究这个主题,我看到了 Microsoft 的 class RetryExponential class。
RetryExponential Class
Namespace: Microsoft.ServiceBus
Assembly: Microsoft.ServiceBus.dll
Represents an implementation of a retry policy. For each time the messaging operation must be retried, the delay between retries grows in a staggered, exponential manner.
public sealed class RetryExponential : Microsoft.ServiceBus.RetryPolicy
另一种选择是使用 MassTransit,它支持 Azure 服务总线。
看看它广泛的retry configuration。
请注意,MassTransit 在收到消息后有效地在内存中进行重试,因此您需要适当调整主题订阅的 MaxDeliveryCount
和 MaxAutoRenewDuration
设置。
您的配置可能类似于:
var busControl = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host(serviceBusConnectionString, hst => { });
cfg.UseMessageRetry(retryConfigurator =>
RetryConfigurationExtensions.Exponential(retryConfigurator, ...);
cfg.SubscriptionEndpoint(
"subscriptionName",
"topicPath",
e =>
{
e.Consumer<SomeConsumer>();
// Let MassTransit do the retries
e.MaxDeliveryCount = 1;
e.MaxAutoRenewDuration = TimeSpan.FromMinutes(10);
});
});
我不断地以速览模式接收消息,如果处理失败(不是投递)则放弃它们。但是,该消息会立即再次可用并再次接收以进行处理。它再次快速失败,并且在最大交付后它是死信。
有没有办法配置topic/subscription在消息被放弃后等待释放?最好以指数方式。
当然,我也愿意通过代码征求建议。
无法在服务总线配置中设置指数退避。我遇到过同样的问题,并做了以下操作:
- 使消息出队,并将消息标记为已接收。
- 在 try/catch 块内执行处理。如果有异常,则在未来的某个时间点将新消息排入队列。
我们已将我们的服务总线消息队列负载包装在一个 class 中,它指定了传递尝试的次数。我们将交付尝试次数乘以一个常数,然后将该数字添加到当前日期时间以用于将来计划的交付。在超过我们想要尝试的投递尝试次数后,我们明确地死信该消息。
编辑 2020 年 7 月 17 日 考虑使用 Azure 持久任务框架,它内置了可自定义的重试策略。
我也在研究这个主题,我看到了 Microsoft 的 class RetryExponential class。
RetryExponential Class
Namespace: Microsoft.ServiceBus
Assembly: Microsoft.ServiceBus.dllRepresents an implementation of a retry policy. For each time the messaging operation must be retried, the delay between retries grows in a staggered, exponential manner.
public sealed class RetryExponential : Microsoft.ServiceBus.RetryPolicy
另一种选择是使用 MassTransit,它支持 Azure 服务总线。
看看它广泛的retry configuration。
请注意,MassTransit 在收到消息后有效地在内存中进行重试,因此您需要适当调整主题订阅的 MaxDeliveryCount
和 MaxAutoRenewDuration
设置。
您的配置可能类似于:
var busControl = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host(serviceBusConnectionString, hst => { });
cfg.UseMessageRetry(retryConfigurator =>
RetryConfigurationExtensions.Exponential(retryConfigurator, ...);
cfg.SubscriptionEndpoint(
"subscriptionName",
"topicPath",
e =>
{
e.Consumer<SomeConsumer>();
// Let MassTransit do the retries
e.MaxDeliveryCount = 1;
e.MaxAutoRenewDuration = TimeSpan.FromMinutes(10);
});
});