在 Azure 函数中按顺序(没有并发调用)处理服务总线消息
Processing Service Bus messages in order (without concurrent calls) in an Azure Function
我需要通过 "Azure function" 读取和处理来自 Azure 服务总线队列的消息。消息应该以正确的顺序处理,所以我需要避免并发调用。
我为此使用了 Azure Function 服务总线触发器(它是队列的唯一订阅者)。根据文档,我将 "servicebus/maxConcurrentCalls"(在 host.json 中)设置为 1。除此之外,我用 "Singleton" 属性修饰了函数。
除此之外,消息似乎由不同的线程以随机顺序处理。
我在这里想念什么?还是我误会了什么?
我使用的文档:
https://github.com/Azure/azure-webjobs-sdk/wiki/Singleton
host.json:
{
"serviceBus": {
"maxConcurrentCalls": 1
}
}
Azure 函数:
using System;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
[Singleton]
public static void Run(BrokeredMessage myQueueItem, TraceWriter log)
{
Stream stream = myQueueItem.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
string messageContentStr = reader.ReadToEnd();
log.Info($"New TEST message: {messageContentStr} on thread {System.Threading.Thread.CurrentThread.ManagedThreadId}");
System.Threading.Thread.Sleep(2000);
}
这是日志记录的摘录。如您所见,有不同的线程。
并且,例如,"Message 19" 出现在 "Message 10" 之前。是的,我确定我将消息按正确的顺序放入队列中。
....
2018-05-09T09:09:33.686 [Info] New TEST message: Message 19 on thread 33
2018-05-09T09:09:35.702 [Info] Function completed (Success, Id=007eccd0-b5db-466a-91c1-4f53ec5a7b3a, Duration=2013ms)
2018-05-09T09:09:36.390 [Info] Function started (Id=b7160487-d10d-47a6-bab3-78da68a93498)
2018-05-09T09:09:36.420 [Info] New TEST message: Message 10 on thread 39
...
查看并确保您的服务总线队列未分区。如果它是分区的,则您有多个消息代理服务请求,并且无法保证消息传递的顺序。您可以在这里阅读更多相关信息:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning#not-using-a-partition-key
具体来说:
In the absence of a partition key, Service Bus distributes messages in
a round-robin fashion to all the fragments of the partitioned queue or
topic. If the chosen fragment is not available, Service Bus assigns
the message to a different fragment. This way, the send operation
succeeds despite the temporary unavailability of a messaging store.
However, you will not achieve the guaranteed ordering that a partition
key provides.
我需要通过 "Azure function" 读取和处理来自 Azure 服务总线队列的消息。消息应该以正确的顺序处理,所以我需要避免并发调用。
我为此使用了 Azure Function 服务总线触发器(它是队列的唯一订阅者)。根据文档,我将 "servicebus/maxConcurrentCalls"(在 host.json 中)设置为 1。除此之外,我用 "Singleton" 属性修饰了函数。 除此之外,消息似乎由不同的线程以随机顺序处理。 我在这里想念什么?还是我误会了什么?
我使用的文档: https://github.com/Azure/azure-webjobs-sdk/wiki/Singleton
host.json:
{
"serviceBus": {
"maxConcurrentCalls": 1
}
}
Azure 函数:
using System;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
[Singleton]
public static void Run(BrokeredMessage myQueueItem, TraceWriter log)
{
Stream stream = myQueueItem.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
string messageContentStr = reader.ReadToEnd();
log.Info($"New TEST message: {messageContentStr} on thread {System.Threading.Thread.CurrentThread.ManagedThreadId}");
System.Threading.Thread.Sleep(2000);
}
这是日志记录的摘录。如您所见,有不同的线程。 并且,例如,"Message 19" 出现在 "Message 10" 之前。是的,我确定我将消息按正确的顺序放入队列中。
....
2018-05-09T09:09:33.686 [Info] New TEST message: Message 19 on thread 33
2018-05-09T09:09:35.702 [Info] Function completed (Success, Id=007eccd0-b5db-466a-91c1-4f53ec5a7b3a, Duration=2013ms)
2018-05-09T09:09:36.390 [Info] Function started (Id=b7160487-d10d-47a6-bab3-78da68a93498)
2018-05-09T09:09:36.420 [Info] New TEST message: Message 10 on thread 39
...
查看并确保您的服务总线队列未分区。如果它是分区的,则您有多个消息代理服务请求,并且无法保证消息传递的顺序。您可以在这里阅读更多相关信息:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning#not-using-a-partition-key
具体来说:
In the absence of a partition key, Service Bus distributes messages in a round-robin fashion to all the fragments of the partitioned queue or topic. If the chosen fragment is not available, Service Bus assigns the message to a different fragment. This way, the send operation succeeds despite the temporary unavailability of a messaging store. However, you will not achieve the guaranteed ordering that a partition key provides.