Azure 服务总线主题的毒消息处理最佳实践
best practices with poison message handling for Azure service bus topic
处理来自 Azure 服务总线的有害消息(使用时抛出异常)可能会导致循环,直到重试次数达到主题订阅的 maxDeliveryCount
设置。
- Azure 服务总线添加的消息
SequenceNumber
是否在每次尝试失败时不断增加直到达到 maxDeliveryCount
?
- 设置
maxDeliveryCount
= 1 是处理有毒消息的最佳做法,这样一旦消息失败,消费者就不会尝试两次处理消息
最佳做法取决于您的应用程序和重试方法。
大多数时候我注意到消息发送失败
依赖服务不可用(Redis,SQL连接问题)
错误消息(消息没有强制参数或某些值不正确)
处理代码问题(消息处理代码中的错误)
对于第一种和第三种情况,我创建了 C# 网络作业以 运行 并重新处理死信消息。
下面是我的代码
internal class Program
{
private static string connectionString = ConfigurationSettings.AppSettings["GroupAssetConnection"];
private static string topicName = ConfigurationSettings.AppSettings["GroupAssetTopic"];
private static string subscriptionName = ConfigurationSettings.AppSettings["GroupAssetSubscription"];
private static string databaseEndPoint = ConfigurationSettings.AppSettings["DatabaseEndPoint"];
private static string databaseKey = ConfigurationSettings.AppSettings["DatabaseKey"];
private static string deadLetterQueuePath = "/$DeadLetterQueue";
private static void Main(string[] args)
{
try
{
ReadDLQMessages(groupAssetSyncService, log);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
finally
{
documentClient.Dispose();
}
Console.WriteLine("All message read successfully from Deadletter queue");
Console.ReadLine();
}
public static void ReadDLQMessages(IGroupAssetSyncService groupSyncService, ILog log)
{
int counter = 1;
SubscriptionClient subscriptionClient = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionName + deadLetterQueuePath);
while (true)
{
BrokeredMessage bmessgage = subscriptionClient.Receive(TimeSpan.FromMilliseconds(500));
if (bmessgage != null)
{
string message = new StreamReader(bmessgage.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
syncService.UpdateDataAsync(message).GetAwaiter().GetResult();
Console.WriteLine($"{counter} message Received");
counter++;
bmessgage.Complete();
}
else
{
break;
}
}
subscriptionClient.Close();
}
}
对于第二种情况,我们手动验证死信消息(自定义 UI/ 服务总线探索),有时我们会更正消息数据,有时我们会清除消息并清除队列。
我不会推荐maxDeliveryCount=1
。如果出现 network/connection 问题,内置重试将处理并从队列中清除。当我在财务应用程序中工作时,我保持 maxDeliveryCount=5
而在我的 IoT 应用程序中是 maxDeliveryCount=3
.
如果您是批量阅读消息,如果任何消息发生错误,将重新处理一个完整的批次。
SequenceNumber 序列号可以作为唯一标识符被信任,因为它是由中央和中立的机构分配的,而不是由客户分配的。它还表示到达的真实顺序,并且比作为顺序标准的时间戳更精确,因为时间戳在极端消息速率下可能没有足够高的分辨率,并且在以下情况下可能会受到(但最小)时钟偏差的影响经纪人所有权在节点之间转移。
处理来自 Azure 服务总线的有害消息(使用时抛出异常)可能会导致循环,直到重试次数达到主题订阅的 maxDeliveryCount
设置。
- Azure 服务总线添加的消息
SequenceNumber
是否在每次尝试失败时不断增加直到达到maxDeliveryCount
? - 设置
maxDeliveryCount
= 1 是处理有毒消息的最佳做法,这样一旦消息失败,消费者就不会尝试两次处理消息
最佳做法取决于您的应用程序和重试方法。
大多数时候我注意到消息发送失败
依赖服务不可用(Redis,SQL连接问题)
错误消息(消息没有强制参数或某些值不正确)
处理代码问题(消息处理代码中的错误)
对于第一种和第三种情况,我创建了 C# 网络作业以 运行 并重新处理死信消息。
下面是我的代码
internal class Program
{
private static string connectionString = ConfigurationSettings.AppSettings["GroupAssetConnection"];
private static string topicName = ConfigurationSettings.AppSettings["GroupAssetTopic"];
private static string subscriptionName = ConfigurationSettings.AppSettings["GroupAssetSubscription"];
private static string databaseEndPoint = ConfigurationSettings.AppSettings["DatabaseEndPoint"];
private static string databaseKey = ConfigurationSettings.AppSettings["DatabaseKey"];
private static string deadLetterQueuePath = "/$DeadLetterQueue";
private static void Main(string[] args)
{
try
{
ReadDLQMessages(groupAssetSyncService, log);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
finally
{
documentClient.Dispose();
}
Console.WriteLine("All message read successfully from Deadletter queue");
Console.ReadLine();
}
public static void ReadDLQMessages(IGroupAssetSyncService groupSyncService, ILog log)
{
int counter = 1;
SubscriptionClient subscriptionClient = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionName + deadLetterQueuePath);
while (true)
{
BrokeredMessage bmessgage = subscriptionClient.Receive(TimeSpan.FromMilliseconds(500));
if (bmessgage != null)
{
string message = new StreamReader(bmessgage.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
syncService.UpdateDataAsync(message).GetAwaiter().GetResult();
Console.WriteLine($"{counter} message Received");
counter++;
bmessgage.Complete();
}
else
{
break;
}
}
subscriptionClient.Close();
}
}
对于第二种情况,我们手动验证死信消息(自定义 UI/ 服务总线探索),有时我们会更正消息数据,有时我们会清除消息并清除队列。
我不会推荐maxDeliveryCount=1
。如果出现 network/connection 问题,内置重试将处理并从队列中清除。当我在财务应用程序中工作时,我保持 maxDeliveryCount=5
而在我的 IoT 应用程序中是 maxDeliveryCount=3
.
如果您是批量阅读消息,如果任何消息发生错误,将重新处理一个完整的批次。
SequenceNumber 序列号可以作为唯一标识符被信任,因为它是由中央和中立的机构分配的,而不是由客户分配的。它还表示到达的真实顺序,并且比作为顺序标准的时间戳更精确,因为时间戳在极端消息速率下可能没有足够高的分辨率,并且在以下情况下可能会受到(但最小)时钟偏差的影响经纪人所有权在节点之间转移。