使用 Azure 函数和服务总线进行序列处理
Sequence processing with Azure Function & Service Bus
我遇到了 Azure 函数服务总线触发器的问题。
问题是 Azure 函数无法在处理新消息之前等待消息完成。它并行处理,在获取下一条消息之前不会等待 5 秒。但我需要它处理顺序(如下图所示)。
我该怎么做?
[FunctionName("HttpStartSingle")]
public static void Run(
[ServiceBusTrigger("MyServiceBusQueue", Connection = "Connection")]string myQueueItem,
[OrchestrationClient] DurableOrchestrationClient starter,
ILogger log)
{
Console.WriteLine($"MessageId={myQueueItem}");
Thread.Sleep(5000);
}
您可以通过两种方法完成此操作,
(1) 您正在寻找 Durable Function
with function chaining
For background jobs you often need to ensure that only one instance of
a particular orchestrator runs at a time. This can be done in Durable
Functions by assigning a specific instance ID to an orchestrator when
creating it.
(2) 根据您写入 Queue 的消息,您需要 partition 数据,这将自动处理您指定的消息顺序不需要通过 azure function
手动处理
总的来说,有序的消息传递不是我努力实现的,因为顺序可能并且在某些时候会被扭曲。也就是说,在某些情况下,这是必需的。为此,您应该使用 Durable Function 来编排消息或使用服务总线 message Sessions.
Azure Functions 最近添加了对有序消息传递的支持(传递部分的重音,因为处理仍然可能失败)。和普通的Function几乎一样,只是稍微改变一下,需要指示SDK使用session。
public async Task Run(
[ServiceBusTrigger("queue",
Connection = "ServiceBusConnectionString",
IsSessionsEnabled = true)] Message message, // Enable Sessions
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.MessageId)}");
await _cosmosDbClient.Save(...);
}
这里有一个 post 更多细节。
警告:使用会话将要求使用会话 ID 发送消息,可能需要在发送方进行更改。
我在 host.json
中使用此配置解决了我的问题
{
"version": "2.0",
"extensions": {
"serviceBus": {
"messageHandlerOptions": {
"maxConcurrentCalls": 1
}
}
}}
我遇到了 Azure 函数服务总线触发器的问题。 问题是 Azure 函数无法在处理新消息之前等待消息完成。它并行处理,在获取下一条消息之前不会等待 5 秒。但我需要它处理顺序(如下图所示)。 我该怎么做?
[FunctionName("HttpStartSingle")]
public static void Run(
[ServiceBusTrigger("MyServiceBusQueue", Connection = "Connection")]string myQueueItem,
[OrchestrationClient] DurableOrchestrationClient starter,
ILogger log)
{
Console.WriteLine($"MessageId={myQueueItem}");
Thread.Sleep(5000);
}
您可以通过两种方法完成此操作,
(1) 您正在寻找 Durable Function
with function chaining
For background jobs you often need to ensure that only one instance of a particular orchestrator runs at a time. This can be done in Durable Functions by assigning a specific instance ID to an orchestrator when creating it.
(2) 根据您写入 Queue 的消息,您需要 partition 数据,这将自动处理您指定的消息顺序不需要通过 azure function
手动处理总的来说,有序的消息传递不是我努力实现的,因为顺序可能并且在某些时候会被扭曲。也就是说,在某些情况下,这是必需的。为此,您应该使用 Durable Function 来编排消息或使用服务总线 message Sessions.
Azure Functions 最近添加了对有序消息传递的支持(传递部分的重音,因为处理仍然可能失败)。和普通的Function几乎一样,只是稍微改变一下,需要指示SDK使用session。
public async Task Run(
[ServiceBusTrigger("queue",
Connection = "ServiceBusConnectionString",
IsSessionsEnabled = true)] Message message, // Enable Sessions
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.MessageId)}");
await _cosmosDbClient.Save(...);
}
这里有一个 post 更多细节。
警告:使用会话将要求使用会话 ID 发送消息,可能需要在发送方进行更改。
我在 host.json
中使用此配置解决了我的问题{
"version": "2.0",
"extensions": {
"serviceBus": {
"messageHandlerOptions": {
"maxConcurrentCalls": 1
}
}
}}