Azure 函数和队列

Azure Function and queue

我有一个功能:

    public async static Task Run([QueueTrigger("efs-api-call-last-datetime", Connection = "StorageConnectionString")]DateTime queueItem,
        [Queue("efs-api-call-last-datetime", Connection = "StorageConnectionString")]CloudQueue inputQueue,
        TraceWriter log)
    {

然后我有很长的过程来处理来自队列的消息。问题是在我处理这条消息时,消息将在 30 秒后重新添加到队列中。我不需要添加这条消息并处理它两次。 我想要这样的代码:

        try
        {
             // long operation
        }
        catch(Exception ex)
        {
            // something wrong. Readd this message in 1 minute
            await inputQueue.AddMessageAsync(new CloudQueueMessage(
                JsonConvert.SerializeObject(queueItem)),
                timeToLive: null,
                initialVisibilityDelay: TimeSpan.FromMinutes(1),
                options: null,
                operationContext: null
                );
        }

并防止自动读取。有什么办法吗?

是的,您可以使同一消息出队两次。

原因:

1.Worker A 使消息 B 出列并且隐形超时到期。消息 B 再次可见,工作人员 C 将消息 B 出列,使工作人员 A 的 pop 收据无效。工人 A 完成工作,去删除消息 B 并抛出错误。这是最常见的。

2.The 对触发第一个 Azure 函数执行的原始消息的锁定可能会过期。这将导致 Queue 假定处理消息失败,然后它将使用该消息触发 Function 再次执行。

3.In 某些条件(非常频繁的队列轮询)您可以在 GetMessage 上两次获得相同的消息。这是一种罕见的竞争条件。工人 A 和 B 轮询速度非常快,同时进入队列,并且都收到相同的消息。这曾经在高轮询场景下更为常见(SDK 1.0 时间框架),但现在在以后的存储更新中变得更加罕见(不记得最近看到过这个)。

1 和 3 只有当你有超过 1 个工人时才会发生。

解决方法

安装 azure-webjobs-sdk 1.0.11015.0 版本(在函数门户的 'Settings' 页面中可见)。详情请参考fixing queue visibility renewals

这里有几件事。

1) 当有多个队列消息等待时,队列触发器获取一批消息并并发调用函数实例进行处理。默认情况下,批量大小为 16。但这可以在 Host.json 中配置。如果要最小化并行执行,可以将批处理大小设置为 1。 Microsoft document 对此进行了解释。

2) 由于过程很长 运行,因此您的消息似乎不完整,函数可能会超时,消息会再次显示。您应该尝试将您的功能分解为更小的功能。然后你可以使用持久函数来链接你必须做的工作。