如何防止 Azure webjob 同时多次处理同一条消息
How to prevent Azure webjob processing same message multiple times concurrently
我有一个 Azure WebJob 项目,我 运行 在我的本地开发机器上。它正在侦听 Azure 服务总线消息队列。没有像 Topics 这样的事情发生,只是最基本的消息队列。
它是receiving/processing同一条消息多次,收到消息后立即启动两次,然后在处理消息时间歇性地启动。
问题:
- 为什么我会立即多次收到同一条消息?似乎在应用 PeekLock 之前重新获取?
- 邮件还在处理中,怎么又重新收到了?我可以设置 PeekLock 持续时间,还是以某种方式将消息锁定为仅处理一次
- 如何确保队列中的每条消息只被处理一次?
- 我希望能够一次处理多条消息,只是多次处理同一条消息,所以将 MaxConcurrentCalls 设置为 1 似乎不是我的答案,或者我误解了 属性?
我使用的是异步函数、简单注入器和自定义 JobActivator,因此我的函数签名不是静态无效方法,而是:
public async Task ProcessQueueMessage([ServiceBusTrigger("AnyQueue")] MediaEncoderQueueItem message, TextWriter log) {...}
在作业中,它正在 blob 服务上移动一些文件,并从媒体服务调用(并等待)媒体编码器。因此,虽然 Web 作业本身没有进行大量处理,但它需要相当长的时间(对于某些文件,需要 15 分钟)。
应用程序正在启动,当我 post 向队列发送消息时,它会响应。但是,一收到消息就多次收到:
Executing: 'Functions.ProcessQueueMessage' - Reason: 'New ServiceBus message detected on 'MyQueue'.'
Executing: 'Functions.ProcessQueueMessage' - Reason: 'New ServiceBus message detected on 'MyQueue'.'
此外,虽然任务是 运行(并且我看到了媒体服务功能的输出),但它将从队列中获得另一个 "copy"。
终于在任务完成后,它仍然断断续续地处理同一条消息。
我怀疑发生的事情如下:
最长 DurationLock
可以是 5 分钟。如果消息的处理在 5 分钟内完成,消息将被标记为已完成并从代理中删除。否则,如果处理时间超过 5 分钟(我们失去了对消息的锁定),消息将重新出现并再次被消费。您可以通过查看邮件的 DeliveryCount
来验证这一点。
要解决这个问题,您可以使用 BrokeredMessage.RenewLockAsync()
.
在即将过期之前更新消息锁定
我有一个 Azure WebJob 项目,我 运行 在我的本地开发机器上。它正在侦听 Azure 服务总线消息队列。没有像 Topics 这样的事情发生,只是最基本的消息队列。
它是receiving/processing同一条消息多次,收到消息后立即启动两次,然后在处理消息时间歇性地启动。
问题:
- 为什么我会立即多次收到同一条消息?似乎在应用 PeekLock 之前重新获取?
- 邮件还在处理中,怎么又重新收到了?我可以设置 PeekLock 持续时间,还是以某种方式将消息锁定为仅处理一次
- 如何确保队列中的每条消息只被处理一次?
- 我希望能够一次处理多条消息,只是多次处理同一条消息,所以将 MaxConcurrentCalls 设置为 1 似乎不是我的答案,或者我误解了 属性?
我使用的是异步函数、简单注入器和自定义 JobActivator,因此我的函数签名不是静态无效方法,而是:
public async Task ProcessQueueMessage([ServiceBusTrigger("AnyQueue")] MediaEncoderQueueItem message, TextWriter log) {...}
在作业中,它正在 blob 服务上移动一些文件,并从媒体服务调用(并等待)媒体编码器。因此,虽然 Web 作业本身没有进行大量处理,但它需要相当长的时间(对于某些文件,需要 15 分钟)。
应用程序正在启动,当我 post 向队列发送消息时,它会响应。但是,一收到消息就多次收到:
Executing: 'Functions.ProcessQueueMessage' - Reason: 'New ServiceBus message detected on 'MyQueue'.'
Executing: 'Functions.ProcessQueueMessage' - Reason: 'New ServiceBus message detected on 'MyQueue'.'
此外,虽然任务是 运行(并且我看到了媒体服务功能的输出),但它将从队列中获得另一个 "copy"。
终于在任务完成后,它仍然断断续续地处理同一条消息。
我怀疑发生的事情如下:
最长 DurationLock
可以是 5 分钟。如果消息的处理在 5 分钟内完成,消息将被标记为已完成并从代理中删除。否则,如果处理时间超过 5 分钟(我们失去了对消息的锁定),消息将重新出现并再次被消费。您可以通过查看邮件的 DeliveryCount
来验证这一点。
要解决这个问题,您可以使用 BrokeredMessage.RenewLockAsync()
.