在 Durable Functions 中处理服务总线消息
Handling service bus messages in Durable Functions
我有一个包含许多 activity 函数的 azure 持久函数。此功能由服务总线主题上的消息触发。我希望能够根据这些 activity 函数的结果手动 complete/defer/dead-letter 服务总线消息。但是,我没有找到从触发器函数中的 ServiceBusTrigger 访问 MessageReceiver
的方法。
我尝试将 MessageReceiver
作为输入传递给 Orchestration 函数,但这会在通过 context.GetInput<MessageReceiver>()
检索值时导致错误,因为 MessageReceiver
无法序列化为 JSON.
Function 'ImportTransactionOrchestrator (Orchestrator)' failed with an error. Reason: Newtonsoft.Json.JsonSerializationException: Unable to find a constructor to use for type Microsoft.Azure.ServiceBus.Core.MessageReceiver. A class should either have a default constructor, one constructor with arguments or a constructor marked with the JsonConstructor attribute.
以下是我的总结:
触发函数
[FunctionName(nameof(ServiceBusTrigger))]
public async Task ServiceBusTrigger(
[ServiceBusTrigger(topicName: "topic", subscriptionName: "sub",
Connection = "connection")]
Message serviceBusMessage,
MessageReceiver messageReceiver,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
var instanceId = await starter
.StartNewAsync("ImportTransactionOrchestrator", null, (messageReceiver, serviceBusMessage));
}
编排函数
[FunctionName(nameof(ImportTransactionOrchestrator))]
public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var (messageReceiver, serviceBusMessage ) =
context.GetInput<(MessageReceiver, Message)>(); // error occurs here
{...}
}
我的搜索结果不多,所以我怀疑我可能在尝试做一些奇怪的事情。如有任何帮助,我们将不胜感激!
您的 Orchestrator 需要负责完成或不完成消息。我相信 messageReceiver 无法序列化并作为参数传递给您的子活动。
您需要将 host.json 中的自动完成 属性 更改为 false:
{
"version": "2.0",
"extensions": {
"serviceBus": {
"prefetchCount": 100,
"messageHandlerOptions": {
"autoComplete": false,
"maxConcurrentCalls": 32,
"maxAutoRenewDuration": "00:55:00"
},
"sessionHandlerOptions": {
"autoComplete": false,
"messageWaitTimeout": "00:00:30",
"maxAutoRenewDuration": "00:55:00",
"maxConcurrentSessions": 16
}
}
}
}
并完成消息:
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
主要功能应该承担死字的责任。
示例代码如下所示:
string instanceId = await starter.StartNewAsync...
var orchestrationStatus = await starter.GetStatusAsync(instanceId);
var status = orchestrationStatus.RuntimeStatus.ToString().ToUpper();
if (!(bool)orchestrationStatus.Output)
{
await messageReceiver.AbandonAsync(lockToken);
}
我有一个包含许多 activity 函数的 azure 持久函数。此功能由服务总线主题上的消息触发。我希望能够根据这些 activity 函数的结果手动 complete/defer/dead-letter 服务总线消息。但是,我没有找到从触发器函数中的 ServiceBusTrigger 访问 MessageReceiver
的方法。
我尝试将 MessageReceiver
作为输入传递给 Orchestration 函数,但这会在通过 context.GetInput<MessageReceiver>()
检索值时导致错误,因为 MessageReceiver
无法序列化为 JSON.
Function 'ImportTransactionOrchestrator (Orchestrator)' failed with an error. Reason: Newtonsoft.Json.JsonSerializationException: Unable to find a constructor to use for type Microsoft.Azure.ServiceBus.Core.MessageReceiver. A class should either have a default constructor, one constructor with arguments or a constructor marked with the JsonConstructor attribute.
以下是我的总结:
触发函数
[FunctionName(nameof(ServiceBusTrigger))]
public async Task ServiceBusTrigger(
[ServiceBusTrigger(topicName: "topic", subscriptionName: "sub",
Connection = "connection")]
Message serviceBusMessage,
MessageReceiver messageReceiver,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
var instanceId = await starter
.StartNewAsync("ImportTransactionOrchestrator", null, (messageReceiver, serviceBusMessage));
}
编排函数
[FunctionName(nameof(ImportTransactionOrchestrator))]
public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var (messageReceiver, serviceBusMessage ) =
context.GetInput<(MessageReceiver, Message)>(); // error occurs here
{...}
}
我的搜索结果不多,所以我怀疑我可能在尝试做一些奇怪的事情。如有任何帮助,我们将不胜感激!
您的 Orchestrator 需要负责完成或不完成消息。我相信 messageReceiver 无法序列化并作为参数传递给您的子活动。
您需要将 host.json 中的自动完成 属性 更改为 false:
{
"version": "2.0",
"extensions": {
"serviceBus": {
"prefetchCount": 100,
"messageHandlerOptions": {
"autoComplete": false,
"maxConcurrentCalls": 32,
"maxAutoRenewDuration": "00:55:00"
},
"sessionHandlerOptions": {
"autoComplete": false,
"messageWaitTimeout": "00:00:30",
"maxAutoRenewDuration": "00:55:00",
"maxConcurrentSessions": 16
}
}
}
}
并完成消息:
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
主要功能应该承担死字的责任。
示例代码如下所示:
string instanceId = await starter.StartNewAsync...
var orchestrationStatus = await starter.GetStatusAsync(instanceId);
var status = orchestrationStatus.RuntimeStatus.ToString().ToUpper();
if (!(bool)orchestrationStatus.Output)
{
await messageReceiver.AbandonAsync(lockToken);
}