Azure 存储队列消息在一段时间后一次性全部移动到毒队列
Azure storage queue messages are moved all at once to poison queue after a period of time
重现步骤
向队列添加消息(约 500 条消息)。
添加将处理队列消息的队列触发函数应用程序。
预期行为
队列中的所有消息都已处理。如果在处理消息时抛出异常,则将消息发送到毒物队列。
实际行为
开始处理消息。一段时间后(大约 30-60 秒),留在队列中的所有消息都将移至毒物队列。在那一刻处理的消息没有抛出异常。
我们检查了队列消息 lifetime 是默认消息:7 天。
我们尝试将 WindowsAzure.Storage 软件包降级为 7.2.1 但没有成功。
WindowsAzure.Storage我们使用的版本是8.5.0.
会不会是函数应用程序崩溃,导致所有消息都被移到毒队列?
编辑
这是将消息添加到队列的方式:
public async Task AddMessage()
{
var queueModel = new QueueModel
{
// queue model properties
};
await AddMessageToQueueAsync(queueModel);
}
public async Task AddMessageToQueueAsync(T messageObject, TimeSpan? initialVisibilityDelay = null)
{
var queue = GetQueue();
var jsonMessage = JsonConvert.SerializeObject(messageObject);
var message = new CloudQueueMessage(jsonMessage);
await queue.AddMessageAsync(message, TimeSpan.FromDays(7), initialVisibilityDelay, new QueueRequestOptions(), new OperationContext());
}
private CloudQueue GetQueue()
{
var queueClient = _storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(_queueName);
queue.CreateIfNotExists();
return queue;
}
这是处理消息的函数应用程序:
[FunctionName("ProcessQueue")]
public static async Task Run([QueueTrigger("queue-name", Connection = "AzureWebJobsStorage")]string queueItem, TraceWriter log)
{
if (String.IsNullOrEmpty(queueItem))
{
return;
}
var queueModel = JsonConvert.DeserializeObject<QueueModel>(queueItem);
if (queueModel == null)
{
return;
}
try
{
// process the message
}
catch (Exception ex)
{
// message is moved to the poison-queue but no exception is thrown
log.Error(ex.Message, ex);
}
}
发现问题。问题出在一个剩余的函数应用程序,它监听同一个队列并与当前函数应用程序并行处理消息。因为两个函数应用程序都处理来自同一个队列的消息,所以在某些时候,留在队列中的消息都被移到了毒队列。 (可能是从队列中读取消息时出现的一些并发问题)。
重现步骤
向队列添加消息(约 500 条消息)。 添加将处理队列消息的队列触发函数应用程序。
预期行为
队列中的所有消息都已处理。如果在处理消息时抛出异常,则将消息发送到毒物队列。
实际行为
开始处理消息。一段时间后(大约 30-60 秒),留在队列中的所有消息都将移至毒物队列。在那一刻处理的消息没有抛出异常。
我们检查了队列消息 lifetime 是默认消息:7 天。 我们尝试将 WindowsAzure.Storage 软件包降级为 7.2.1 但没有成功。
WindowsAzure.Storage我们使用的版本是8.5.0.
会不会是函数应用程序崩溃,导致所有消息都被移到毒队列?
编辑
这是将消息添加到队列的方式:
public async Task AddMessage()
{
var queueModel = new QueueModel
{
// queue model properties
};
await AddMessageToQueueAsync(queueModel);
}
public async Task AddMessageToQueueAsync(T messageObject, TimeSpan? initialVisibilityDelay = null)
{
var queue = GetQueue();
var jsonMessage = JsonConvert.SerializeObject(messageObject);
var message = new CloudQueueMessage(jsonMessage);
await queue.AddMessageAsync(message, TimeSpan.FromDays(7), initialVisibilityDelay, new QueueRequestOptions(), new OperationContext());
}
private CloudQueue GetQueue()
{
var queueClient = _storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(_queueName);
queue.CreateIfNotExists();
return queue;
}
这是处理消息的函数应用程序:
[FunctionName("ProcessQueue")]
public static async Task Run([QueueTrigger("queue-name", Connection = "AzureWebJobsStorage")]string queueItem, TraceWriter log)
{
if (String.IsNullOrEmpty(queueItem))
{
return;
}
var queueModel = JsonConvert.DeserializeObject<QueueModel>(queueItem);
if (queueModel == null)
{
return;
}
try
{
// process the message
}
catch (Exception ex)
{
// message is moved to the poison-queue but no exception is thrown
log.Error(ex.Message, ex);
}
}
发现问题。问题出在一个剩余的函数应用程序,它监听同一个队列并与当前函数应用程序并行处理消息。因为两个函数应用程序都处理来自同一个队列的消息,所以在某些时候,留在队列中的消息都被移到了毒队列。 (可能是从队列中读取消息时出现的一些并发问题)。