来自服务总线队列的消息在 activity 函数中出错时消失
Message from servicebus queue disappears on error in activity function
我开发了一个 Azure Durable Functions 应用程序,可以触发新的服务总线队列消息。当没有错误发生时它工作正常,但是当 activity 函数中发生错误时,它记录它失败但消息从队列中永远消失了。可能是什么原因造成的,如何防止消息因错误从队列中消失?
这里是可重现的代码,它是VS2017中新的Azure Function模板生成的代码,只在城市为"Seattle"时添加了一个异常,并且它是ServicebusTrigger而不是HttpTrigger。
[FunctionName("Test")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
var outputs = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "London"));
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
}
[FunctionName("Test_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
log.LogInformation($"Saying hello to {name}.");
if (name == "Seattle")
throw new Exception("An error occurs");
return $"Hello {name}!";
}
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}
更新:当我在 Orchestration 客户端函数中出现异常时,它会做正确的事情,例如重试并在重试失败 x 次时将消息放入死信队列。
所以我设法通过使用这个 while 循环更新客户端函数来解决这个问题,检查 failed/terminated/canceled 状态。
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
var status = await starter.GetStatusAsync(instanceId);
while (status.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
{
System.Threading.Thread.Sleep(1000);
status = await starter.GetStatusAsync(instanceId);
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled)
{
throw new Exception("Orchestration failed with error: " + status.Output);
}
}
}
然而,这对我来说似乎是一个 hack,而且我没有在任何 MS 示例代码中看到过这种类型的代码。我想这应该由持久函数框架来处理。还有其他方法可以使服务总线触发器在持久函数中工作吗?
此行为是设计使然。启动编排是异步的 - 即 StartNewAsync
API 不会自动等待编排 运行 或完成。在内部,StartNewAsync
只是将一条消息放入 Azure 存储队列并将一个条目写入 Azure 存储 table。如果成功,那么您的服务总线函数将继续 运行ning 并成功完成,此时消息将被删除。
如果您确实需要重试服务总线队列消息,您的解决方法是接受table,但我怀疑您为什么需要这样做。编排本身可以在不依赖服务总线的情况下管理自己的重试。例如,您可以使用 CallActivityWithRetryAsync 在业务流程内部重试。
请参阅 Durable Functions 文档的 Error Handling 主题。
我知道这是一个旧线程,但我想分享一下我是如何使用 ServiceBusTrigger
和 WaitForCompletionOrCreateCheckStatusResponseAsync
来实现它的。
[FunctionName(nameof(QueueTriggerFunction))]
public async Task QueueTriggerFunction(
[ServiceBusTrigger("queue-name", Connection = "connectionstring-key")]string queueMessage,
MessageReceiver messageReceiver,
string lockToken,
string messageId,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
//note: autocomplete is disabled
try
{
//start durable function
var instanceId = await starter.StartNewAsync(nameof(OrchestratorFunction), queueMessage);
//get the payload (we want to use the status uri)
var payload = starter.CreateHttpManagementPayload(instanceId);
//instruct QueueTriggerFunction to wait for response
await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(new HttpRequestMessage(HttpMethod.Get, payload.StatusQueryGetUri), instanceId);
//response ready, get status
var status = await starter.GetStatusAsync(instanceId);
//act on status
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
//like completing the message
await messageReceiver.CompleteAsync(lockToken);
log.LogInformation($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} succeeded [MessageId={messageId}]");
}
else
{
//or deadletter the sob
await messageReceiver.DeadLetterAsync(lockToken);
log.LogError($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} failed [MessageId={messageId}]");
}
}
catch (Exception ex)
{
//not sure what went wrong, let the lock expire and try again (until max retry attempts is reached)
log.LogError(ex, $"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: handler failed [MessageId={messageId}]");
}
}
事实是,互联网上的所有示例都使用 HttpTrigger
并使用该触发器的 httprequest 来检查是否完成,但您没有使用 ServiceBusTrigger
。此外,我认为那是不正确的,您应该使用负载调用中的状态 uri,就像我在这里使用协调器函数的 instanceId 所做的那样。
我开发了一个 Azure Durable Functions 应用程序,可以触发新的服务总线队列消息。当没有错误发生时它工作正常,但是当 activity 函数中发生错误时,它记录它失败但消息从队列中永远消失了。可能是什么原因造成的,如何防止消息因错误从队列中消失?
这里是可重现的代码,它是VS2017中新的Azure Function模板生成的代码,只在城市为"Seattle"时添加了一个异常,并且它是ServicebusTrigger而不是HttpTrigger。
[FunctionName("Test")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
var outputs = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "London"));
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
}
[FunctionName("Test_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
log.LogInformation($"Saying hello to {name}.");
if (name == "Seattle")
throw new Exception("An error occurs");
return $"Hello {name}!";
}
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}
更新:当我在 Orchestration 客户端函数中出现异常时,它会做正确的事情,例如重试并在重试失败 x 次时将消息放入死信队列。
所以我设法通过使用这个 while 循环更新客户端函数来解决这个问题,检查 failed/terminated/canceled 状态。
[FunctionName("Test_HttpStart")]
public static async Task ServiceBusStart(
[ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
string instanceId = await starter.StartNewAsync("Test", msg);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
var status = await starter.GetStatusAsync(instanceId);
while (status.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
{
System.Threading.Thread.Sleep(1000);
status = await starter.GetStatusAsync(instanceId);
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled)
{
throw new Exception("Orchestration failed with error: " + status.Output);
}
}
}
然而,这对我来说似乎是一个 hack,而且我没有在任何 MS 示例代码中看到过这种类型的代码。我想这应该由持久函数框架来处理。还有其他方法可以使服务总线触发器在持久函数中工作吗?
此行为是设计使然。启动编排是异步的 - 即 StartNewAsync
API 不会自动等待编排 运行 或完成。在内部,StartNewAsync
只是将一条消息放入 Azure 存储队列并将一个条目写入 Azure 存储 table。如果成功,那么您的服务总线函数将继续 运行ning 并成功完成,此时消息将被删除。
如果您确实需要重试服务总线队列消息,您的解决方法是接受table,但我怀疑您为什么需要这样做。编排本身可以在不依赖服务总线的情况下管理自己的重试。例如,您可以使用 CallActivityWithRetryAsync 在业务流程内部重试。
请参阅 Durable Functions 文档的 Error Handling 主题。
我知道这是一个旧线程,但我想分享一下我是如何使用 ServiceBusTrigger
和 WaitForCompletionOrCreateCheckStatusResponseAsync
来实现它的。
[FunctionName(nameof(QueueTriggerFunction))]
public async Task QueueTriggerFunction(
[ServiceBusTrigger("queue-name", Connection = "connectionstring-key")]string queueMessage,
MessageReceiver messageReceiver,
string lockToken,
string messageId,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
//note: autocomplete is disabled
try
{
//start durable function
var instanceId = await starter.StartNewAsync(nameof(OrchestratorFunction), queueMessage);
//get the payload (we want to use the status uri)
var payload = starter.CreateHttpManagementPayload(instanceId);
//instruct QueueTriggerFunction to wait for response
await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(new HttpRequestMessage(HttpMethod.Get, payload.StatusQueryGetUri), instanceId);
//response ready, get status
var status = await starter.GetStatusAsync(instanceId);
//act on status
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
//like completing the message
await messageReceiver.CompleteAsync(lockToken);
log.LogInformation($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} succeeded [MessageId={messageId}]");
}
else
{
//or deadletter the sob
await messageReceiver.DeadLetterAsync(lockToken);
log.LogError($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} failed [MessageId={messageId}]");
}
}
catch (Exception ex)
{
//not sure what went wrong, let the lock expire and try again (until max retry attempts is reached)
log.LogError(ex, $"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: handler failed [MessageId={messageId}]");
}
}
事实是,互联网上的所有示例都使用 HttpTrigger
并使用该触发器的 httprequest 来检查是否完成,但您没有使用 ServiceBusTrigger
。此外,我认为那是不正确的,您应该使用负载调用中的状态 uri,就像我在这里使用协调器函数的 instanceId 所做的那样。