Azure Function Service Bus 触发器比队列中的消息触发次数更多
Azure Function Service Bus trigger fires more times than messages in the queue
我有两个服务总线队列和 Azure 服务总线队列触发器。
函数从队列中读取1000条消息转发给另一个(测试用)
发布到 Azure(计划消费)时,即使队列中只有 1000 条消息,我也会获得 1030 多个函数命中。我认为这与函数实例的数量有关。
问:如何对 1 个函数实例只处理 1 个唯一消息一次?
[FunctionName("Function1")]
public static async Task Run([ServiceBusTrigger("export-queue",
Connection = "Connection")]
string myQueueItem, ILogger log)
{
log.LogInformation($"Message: {myQueueItem}:{DateTime.Now}");
Thread.Sleep(2000);
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
var message = new Message(Encoding.UTF8.GetBytes(myQueueItem));
await queueClient.SendAsync(message);
await queueClient.CloseAsync();
}
默认情况下,ServiceBusTrigger
使用 PeekLock
mode。这在技术上意味着如果消息锁已被锁定,则消息可以被处理多次。这可以解释为什么对于 1,000 条传入消息,您会收到超过 1,000 次调用。
另一个角度是函数执行的持续时间与预取大小的结合。如果执行时间长到足以失去对某些预取消息的锁定,请减少 prefetch.
此外,考虑以一种您知道消息被处理的方式设计您的解决方案至少一次,而不是恰好一次.
对我有用。
为您的 host.json 添加扩展:
"extensions": {
"serviceBus": {
"prefetchCount": 1,
"messageHandlerOptions": {
"autoComplete": true,
"maxConcurrentCalls": 1,
"maxAutoRenewDuration": "00:05:00"
},
"sessionHandlerOptions": {
"autoComplete": true,
"messageWaitTimeout": "00:00:30",
"maxAutoRenewDuration": "00:55:00",
"maxConcurrentSessions": 10
}
}
}
我有两个服务总线队列和 Azure 服务总线队列触发器。
函数从队列中读取1000条消息转发给另一个(测试用) 发布到 Azure(计划消费)时,即使队列中只有 1000 条消息,我也会获得 1030 多个函数命中。我认为这与函数实例的数量有关。
问:如何对 1 个函数实例只处理 1 个唯一消息一次?
[FunctionName("Function1")]
public static async Task Run([ServiceBusTrigger("export-queue",
Connection = "Connection")]
string myQueueItem, ILogger log)
{
log.LogInformation($"Message: {myQueueItem}:{DateTime.Now}");
Thread.Sleep(2000);
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
var message = new Message(Encoding.UTF8.GetBytes(myQueueItem));
await queueClient.SendAsync(message);
await queueClient.CloseAsync();
}
默认情况下,ServiceBusTrigger
使用 PeekLock
mode。这在技术上意味着如果消息锁已被锁定,则消息可以被处理多次。这可以解释为什么对于 1,000 条传入消息,您会收到超过 1,000 次调用。
另一个角度是函数执行的持续时间与预取大小的结合。如果执行时间长到足以失去对某些预取消息的锁定,请减少 prefetch.
此外,考虑以一种您知道消息被处理的方式设计您的解决方案至少一次,而不是恰好一次.
对我有用。
为您的 host.json 添加扩展:
"extensions": {
"serviceBus": {
"prefetchCount": 1,
"messageHandlerOptions": {
"autoComplete": true,
"maxConcurrentCalls": 1,
"maxAutoRenewDuration": "00:05:00"
},
"sessionHandlerOptions": {
"autoComplete": true,
"messageWaitTimeout": "00:00:30",
"maxAutoRenewDuration": "00:55:00",
"maxConcurrentSessions": 10
}
}
}