Azure 存储队列消息在一段时间后一次性全部移动到毒队列

Azure storage queue messages are moved all at once to poison queue after a period of time

重现步骤

向队列添加消息(约 500 条消息)。 添加将处理队列消息的队列触发函数应用程序。

预期行为

队列中的所有消息都已处理。如果在处理消息时抛出异常,则将消息发送到毒物队列。

实际行为

开始处理消息。一段时间后(大约 30-60 秒),留在队列中的所有消息都将移至毒物队列。在那一刻处理的消息没有抛出异常。

我们检查了队列消息 lifetime 是默认消息:7 天。 我们尝试将 WindowsAzure.Storage 软件包降级为 7.2.1 但没有成功。

WindowsAzure.Storage我们使用的版本是8.5.0.

会不会是函数应用程序崩溃,导致所有消息都被移到毒队列?

编辑

这是将消息添加到队列的方式:

public async Task AddMessage()
{
    var queueModel = new QueueModel
    {
        // queue model properties
    };

    await AddMessageToQueueAsync(queueModel);
}


public async Task AddMessageToQueueAsync(T messageObject, TimeSpan? initialVisibilityDelay = null)
{
    var queue = GetQueue();
    var jsonMessage = JsonConvert.SerializeObject(messageObject);
    var message = new CloudQueueMessage(jsonMessage);

    await queue.AddMessageAsync(message, TimeSpan.FromDays(7), initialVisibilityDelay, new QueueRequestOptions(), new OperationContext());
}

private CloudQueue GetQueue()
{
    var queueClient = _storageAccount.CreateCloudQueueClient();

    var queue = queueClient.GetQueueReference(_queueName);
    queue.CreateIfNotExists();

    return queue;
}

这是处理消息的函数应用程序:

[FunctionName("ProcessQueue")]
public static async Task Run([QueueTrigger("queue-name", Connection = "AzureWebJobsStorage")]string queueItem, TraceWriter log)
{
    if (String.IsNullOrEmpty(queueItem))
    {
        return;
    }

    var queueModel = JsonConvert.DeserializeObject<QueueModel>(queueItem);
    if (queueModel == null)
    {
        return;
    }

    try
    {
        // process the message
    }
    catch (Exception ex)
    {
        // message is moved to the poison-queue but no exception is thrown
        log.Error(ex.Message, ex);
    }
}

发现问题。问题出在一个剩余的函数应用程序,它监听同一个队列并与当前函数应用程序并行处理消息。因为两个函数应用程序都处理来自同一个队列的消息,所以在某些时候,留在队列中的消息都被移到了毒队列。 (可能是从队列中读取消息时出现的一些并发问题)。