在 Durable Functions 中处理服务总线消息

Handling service bus messages in Durable Functions

我有一个包含许多 activity 函数的 azure 持久函数。此功能由服务总线主题上的消息触发。我希望能够根据这些 activity 函数的结果手动 complete/defer/dead-letter 服务总线消息。但是,我没有找到从触发器函数中的 ServiceBusTrigger 访问 MessageReceiver 的方法。

我尝试将 MessageReceiver 作为输入传递给 Orchestration 函数,但这会在通过 context.GetInput<MessageReceiver>() 检索值时导致错误,因为 MessageReceiver 无法序列化为 JSON.

Function 'ImportTransactionOrchestrator (Orchestrator)' failed with an error. Reason: Newtonsoft.Json.JsonSerializationException: Unable to find a constructor to use for type Microsoft.Azure.ServiceBus.Core.MessageReceiver. A class should either have a default constructor, one constructor with arguments or a constructor marked with the JsonConstructor attribute.

以下是我的总结:

触发函数

[FunctionName(nameof(ServiceBusTrigger))]
public async Task ServiceBusTrigger(
    [ServiceBusTrigger(topicName: "topic", subscriptionName: "sub",
        Connection = "connection")]
    Message serviceBusMessage,
    MessageReceiver messageReceiver,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    var instanceId = await starter
        .StartNewAsync("ImportTransactionOrchestrator", null, (messageReceiver, serviceBusMessage));
}

编排函数

 [FunctionName(nameof(ImportTransactionOrchestrator))]
 public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
 {
     var (messageReceiver, serviceBusMessage ) = 
          context.GetInput<(MessageReceiver, Message)>(); // error occurs here
     {...}
 }

我的搜索结果不多,所以我怀疑我可能在尝试做一些奇怪的事情。如有任何帮助,我们将不胜感激!

您的 Orchestrator 需要负责完成或不完成消息。我相信 messageReceiver 无法序列化并作为参数传递给您的子活动。

您需要将 host.json 中的自动完成 属性 更改为 false:

{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "prefetchCount": 100,
            "messageHandlerOptions": {
                "autoComplete": false,
                "maxConcurrentCalls": 32,
                "maxAutoRenewDuration": "00:55:00"
            },
            "sessionHandlerOptions": {
                "autoComplete": false,
                "messageWaitTimeout": "00:00:30",
                "maxAutoRenewDuration": "00:55:00",
                "maxConcurrentSessions": 16
            }
        }
    }
}

并完成消息:

await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);

主要功能应该承担死字的责任。

示例代码如下所示:

string instanceId = await starter.StartNewAsync...
var orchestrationStatus = await starter.GetStatusAsync(instanceId);
var status = orchestrationStatus.RuntimeStatus.ToString().ToUpper();

if (!(bool)orchestrationStatus.Output)
{
   await messageReceiver.AbandonAsync(lockToken);
}