在 Azure 函数中按顺序(没有并发调用)处理服务总线消息

Processing Service Bus messages in order (without concurrent calls) in an Azure Function

我需要通过 "Azure function" 读取和处理来自 Azure 服务总线队列的消息。消息应该以正确的顺序处理,所以我需要避免并发调用。

我为此使用了 Azure Function 服务总线触发器(它是队列的唯一订阅者)。根据文档,我将 "servicebus/maxConcurrentCalls"(在 host.json 中)设置为 1。除此之外,我用 "Singleton" 属性修饰了函数。 除此之外,消息似乎由不同的线程以随机顺序处理。 我在这里想念什么?还是我误会了什么?

我使用的文档: https://github.com/Azure/azure-webjobs-sdk/wiki/Singleton

host.json:

{
  "serviceBus": {
    "maxConcurrentCalls": 1
  }
}

Azure 函数:

using System;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;

[Singleton]
public static void Run(BrokeredMessage myQueueItem, TraceWriter log)
{
    Stream stream = myQueueItem.GetBody<Stream>();
    StreamReader reader = new StreamReader(stream);
    string messageContentStr = reader.ReadToEnd();

    log.Info($"New TEST message: {messageContentStr} on thread {System.Threading.Thread.CurrentThread.ManagedThreadId}");   

    System.Threading.Thread.Sleep(2000);     
}

这是日志记录的摘录。如您所见,有不同的线程。 并且,例如,"Message 19" 出现在 "Message 10" 之前。是的,我确定我将消息按正确的顺序放入队列中。

....
2018-05-09T09:09:33.686 [Info] New TEST message: Message 19 on thread 33
2018-05-09T09:09:35.702 [Info] Function completed (Success, Id=007eccd0-b5db-466a-91c1-4f53ec5a7b3a, Duration=2013ms)
2018-05-09T09:09:36.390 [Info] Function started (Id=b7160487-d10d-47a6-bab3-78da68a93498)
2018-05-09T09:09:36.420 [Info] New TEST message: Message 10 on thread 39
...

查看并确保您的服务总线队列未分区。如果它是分区的,则您有多个消息代理服务请求,并且无法保证消息传递的顺序。您可以在这里阅读更多相关信息:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning#not-using-a-partition-key

具体来说:

In the absence of a partition key, Service Bus distributes messages in a round-robin fashion to all the fragments of the partitioned queue or topic. If the chosen fragment is not available, Service Bus assigns the message to a different fragment. This way, the send operation succeeds despite the temporary unavailability of a messaging store. However, you will not achieve the guaranteed ordering that a partition key provides.