使用 Azure 函数和服务总线进行序列处理

Sequence processing with Azure Function & Service Bus

我遇到了 Azure 函数服务总线触发器的问题。 问题是 Azure 函数无法在处理新消息之前等待消息完成。它并行处理,在获取下一条消息之前不会等待 5 秒。但我需要它处理顺序(如下图所示)。 我该怎么做?

[FunctionName("HttpStartSingle")]
    public static void Run(
 [ServiceBusTrigger("MyServiceBusQueue", Connection = "Connection")]string myQueueItem,
[OrchestrationClient] DurableOrchestrationClient starter,
ILogger log)
    {
        Console.WriteLine($"MessageId={myQueueItem}");
        Thread.Sleep(5000);
    }

您可以通过两种方法完成此操作,

(1) 您正在寻找 Durable Function with function chaining

For background jobs you often need to ensure that only one instance of a particular orchestrator runs at a time. This can be done in Durable Functions by assigning a specific instance ID to an orchestrator when creating it.

(2) 根据您写入 Queue 的消息,您需要 partition 数据,这将自动处理您指定的消息顺序不需要通过 azure function

手动处理

总的来说,有序的消息传递不是我努力实现的,因为顺序可能并且在某些时候会被扭曲。也就是说,在某些情况下,这是必需的。为此,您应该使用 Durable Function 来编排消息或使用服务总线 message Sessions.

Azure Functions 最近添加了对有序消息传递的支持(传递部分的重音,因为处理仍然可能失败)。和普通的Function几乎一样,只是稍微改变一下,需要指示SDK使用session。

public async Task Run(
  [ServiceBusTrigger("queue", 
   Connection = "ServiceBusConnectionString",
   IsSessionsEnabled = true)] Message message, // Enable Sessions
   ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.MessageId)}");
    await _cosmosDbClient.Save(...);
}

这里有一个 post 更多细节。

警告:使用会话将要求使用会话 ID 发送消息,可能需要在发送方进行更改。

我在 host.json

中使用此配置解决了我的问题
{
"version": "2.0",
"extensions": {
    "serviceBus": {
        "messageHandlerOptions": {
            "maxConcurrentCalls": 1
        }
    }
}}