手动 'approve' 子协调器重试 (Azure Durable Functions)
Manually 'approve' sub-orchestrator retry (Azure Durable Functions)
我正在使用 Azure Durable Functions 构建一个订单处理系统来逐步处理传入的订单。此过程包括执行由一个“主要”编排器处理的多个子编排。
为了切中要点,我简化了下面的例子
[FunctionName(nameof(RunMainOrchestrator))]
public async Task<ProcessedOrder> RunMainOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var customerOrder = context.GetInput<CustomerOrder>();
var processedOrder = new ProcessedOrder
{
Id = customerOrder.TransactionId,
customerOrder = customerOrder,
};
// Step 0: Create an initial record in the database
var incomingOrder = await context.CallActivityAsync<ProcessedOrder>("StoreBaseOrder", processedOrder);
// Step 1 of the order processing
var step1Result = await context.CallSubOrchestratorAsync<ProcessedOrder>("SubOrchestrator_1", incomingOrder);
// Step 2 of the order processing
var step2Result = await context.CallSubOrchestratorAsync"SubOrchestrator_2", step1Result);
// [Do more steps]
// Order processing finished
return processedOrder;
}
在某些情况下,其中一个子流程会出现问题。员工应该查看并手动批准重试或终止。
当前解
如果出现问题,子协调器等待外部事件(手动批准):
[FunctionName(nameof(SubOrchestrator_1))]
public async Task<ProcessedOrder> SubOrchestrator_1([OrchestrationTrigger] IDurableOrchestrationContext context)
{
// (...)
var input = new { Input = "Example" };
var processedResult = await context.CallActivityAsync<ProcessedOrder>("DoSomething", input);
if (processedResult.HasFailed)
{
// (...)
// Wait on an employee to investigate the issue
await context.WaitForExternalEvent("EmployeeApproval");
// I have to return the result instead of null.
// If I don't, the main orchestrator gets issues with chaining the result to the next sub-orchestrator.
return processedResult;
}
// Processing succesful.
return processedResult;
}
与此同时,一名员工查看错误并修复它并触发 HTTP 调用以引发“EmployeeApproval”事件。此事件接收失败的子协调器的 InstanceId
,因此它可以重新启动。
[FunctionName(nameof(EmployeeApprovalReceived))]
public async Task EmployeeApprovalReceived(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "order-processer/approve-proceeding")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient client
)
{
var approval = await req.Content.ReadAsAsync<EmployeeApproval>();
// First raise the event to the sub-orchestrator to stop waiting
await client.RaiseEventAsync(approval.SubOrchestratorInstanceId, "EmployeeApproval");
// Restart the sub-orchestrator with a new instance to keep track of previous attempts.
await client.RestartAsync(approval.SubOrchestratorInstanceId, restartWithNewInstanceId: true);
}
如您所见,我首先引发事件 然后 我重新启动 Sub Orchestrator。这是因为 return 语句 必须 在任何重启发生之前执行。否则,将抛出一个异常,指出该实例不能处于 运行 状态。
问题
从事件引发的那一刻起,子协调者'completes'。结果是 main 协调器继续使用 next 子协调器。现在,这看起来很正常,因为子协调器已经“完成”了。但实际上它失败了。下一个子协调器将失败,因为 returned null
来自前一个。
失败的子协调器的重启工作正常,但没有任何价值,因为 下一个 子协调器已经启动。
我想阻止它进入下一个子协调器,但是我不能使用内置的重试机制,因为我需要先批准重试。
问题
如何防止主协调器在失败的子协调器完成重试之前继续?什么是持久功能方面的“最佳实践”?
像这样的东西可以工作。
使用循环,我们可以在获得批准后重试任务。
我还添加了一个等待两个事件并查看接收到哪个事件的示例。
请注意此代码尚未经过测试:)
[FunctionName(nameof(SubOrchestrator_1))]
public async Task<ProcessedOrder> SubOrchestrator_1([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var retry = false;
ProcessedOrder processedResult;
do
{
// (...)
var input = new { Input = "Example" };
processedResult = await context.CallActivityAsync<ProcessedOrder>("DoSomething", input);
if (processedResult.HasFailed)
{
// (...)
// Wait on an employee to investigate the issue
var approvalTask = context.WaitForExternalEvent("EmployeeApproval");
var terminateTask = context.WaitForExternalEvent("EmployeeTerminate");
var winnerTask = await Task.WhenAny(approvalTask, terminateTask);
if (winnerTask == terminateTask)
{
// Return something to main orchestrator that indicates termination
return terminatedResult;
}
retry = true;
}
} while (retry);
// Processing succesful.
return processedResult;
}
我正在使用 Azure Durable Functions 构建一个订单处理系统来逐步处理传入的订单。此过程包括执行由一个“主要”编排器处理的多个子编排。
为了切中要点,我简化了下面的例子
[FunctionName(nameof(RunMainOrchestrator))]
public async Task<ProcessedOrder> RunMainOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var customerOrder = context.GetInput<CustomerOrder>();
var processedOrder = new ProcessedOrder
{
Id = customerOrder.TransactionId,
customerOrder = customerOrder,
};
// Step 0: Create an initial record in the database
var incomingOrder = await context.CallActivityAsync<ProcessedOrder>("StoreBaseOrder", processedOrder);
// Step 1 of the order processing
var step1Result = await context.CallSubOrchestratorAsync<ProcessedOrder>("SubOrchestrator_1", incomingOrder);
// Step 2 of the order processing
var step2Result = await context.CallSubOrchestratorAsync"SubOrchestrator_2", step1Result);
// [Do more steps]
// Order processing finished
return processedOrder;
}
在某些情况下,其中一个子流程会出现问题。员工应该查看并手动批准重试或终止。
当前解
如果出现问题,子协调器等待外部事件(手动批准):
[FunctionName(nameof(SubOrchestrator_1))]
public async Task<ProcessedOrder> SubOrchestrator_1([OrchestrationTrigger] IDurableOrchestrationContext context)
{
// (...)
var input = new { Input = "Example" };
var processedResult = await context.CallActivityAsync<ProcessedOrder>("DoSomething", input);
if (processedResult.HasFailed)
{
// (...)
// Wait on an employee to investigate the issue
await context.WaitForExternalEvent("EmployeeApproval");
// I have to return the result instead of null.
// If I don't, the main orchestrator gets issues with chaining the result to the next sub-orchestrator.
return processedResult;
}
// Processing succesful.
return processedResult;
}
与此同时,一名员工查看错误并修复它并触发 HTTP 调用以引发“EmployeeApproval”事件。此事件接收失败的子协调器的 InstanceId
,因此它可以重新启动。
[FunctionName(nameof(EmployeeApprovalReceived))]
public async Task EmployeeApprovalReceived(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "order-processer/approve-proceeding")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient client
)
{
var approval = await req.Content.ReadAsAsync<EmployeeApproval>();
// First raise the event to the sub-orchestrator to stop waiting
await client.RaiseEventAsync(approval.SubOrchestratorInstanceId, "EmployeeApproval");
// Restart the sub-orchestrator with a new instance to keep track of previous attempts.
await client.RestartAsync(approval.SubOrchestratorInstanceId, restartWithNewInstanceId: true);
}
如您所见,我首先引发事件 然后 我重新启动 Sub Orchestrator。这是因为 return 语句 必须 在任何重启发生之前执行。否则,将抛出一个异常,指出该实例不能处于 运行 状态。
问题
从事件引发的那一刻起,子协调者'completes'。结果是 main 协调器继续使用 next 子协调器。现在,这看起来很正常,因为子协调器已经“完成”了。但实际上它失败了。下一个子协调器将失败,因为 returned null
来自前一个。
失败的子协调器的重启工作正常,但没有任何价值,因为 下一个 子协调器已经启动。
我想阻止它进入下一个子协调器,但是我不能使用内置的重试机制,因为我需要先批准重试。
问题
如何防止主协调器在失败的子协调器完成重试之前继续?什么是持久功能方面的“最佳实践”?
像这样的东西可以工作。 使用循环,我们可以在获得批准后重试任务。 我还添加了一个等待两个事件并查看接收到哪个事件的示例。 请注意此代码尚未经过测试:)
[FunctionName(nameof(SubOrchestrator_1))]
public async Task<ProcessedOrder> SubOrchestrator_1([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var retry = false;
ProcessedOrder processedResult;
do
{
// (...)
var input = new { Input = "Example" };
processedResult = await context.CallActivityAsync<ProcessedOrder>("DoSomething", input);
if (processedResult.HasFailed)
{
// (...)
// Wait on an employee to investigate the issue
var approvalTask = context.WaitForExternalEvent("EmployeeApproval");
var terminateTask = context.WaitForExternalEvent("EmployeeTerminate");
var winnerTask = await Task.WhenAny(approvalTask, terminateTask);
if (winnerTask == terminateTask)
{
// Return something to main orchestrator that indicates termination
return terminatedResult;
}
retry = true;
}
} while (retry);
// Processing succesful.
return processedResult;
}