手动 'approve' 子协调器重试 (Azure Durable Functions)

Manually 'approve' sub-orchestrator retry (Azure Durable Functions)

我正在使用 Azure Durable Functions 构建一个订单处理系统来逐步处理传入的订单。此过程包括执行由一个“主要”编排器处理的多个子编排。

为了切中要点,我简化了下面的例子

[FunctionName(nameof(RunMainOrchestrator))]
public async Task<ProcessedOrder> RunMainOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var customerOrder = context.GetInput<CustomerOrder>();

    var processedOrder = new ProcessedOrder
    {
        Id = customerOrder.TransactionId,
        customerOrder = customerOrder,
    };

    // Step 0: Create an initial record in the database
    var incomingOrder = await context.CallActivityAsync<ProcessedOrder>("StoreBaseOrder", processedOrder);

    // Step 1 of the order processing 
    var step1Result = await context.CallSubOrchestratorAsync<ProcessedOrder>("SubOrchestrator_1", incomingOrder);

    // Step 2 of the order processing 
    var step2Result = await context.CallSubOrchestratorAsync"SubOrchestrator_2", step1Result);

    // [Do more steps]

    // Order processing finished

    return processedOrder;
}

在某些情况下,其中一个子流程会出现问题。员工应该查看并手动批准重试或终止。

当前解

如果出现问题,子协调器等待外部事件(手动批准):

[FunctionName(nameof(SubOrchestrator_1))]
public async Task<ProcessedOrder> SubOrchestrator_1([OrchestrationTrigger] IDurableOrchestrationContext context)
{
    // (...)

    var input = new { Input = "Example" };
    var processedResult = await context.CallActivityAsync<ProcessedOrder>("DoSomething", input);

    if (processedResult.HasFailed)
    {
        // (...)

        // Wait on an employee to investigate the issue
        await context.WaitForExternalEvent("EmployeeApproval");

        // I have to return the result instead of null. 
        // If I don't, the main orchestrator gets issues with chaining the result to the next sub-orchestrator.
        return processedResult;
    }

    // Processing succesful.
    return processedResult;
}

与此同时,一名员工查看错误并修复它并触发 HTTP 调用以引发“EmployeeApproval”事件。此事件接收失败的子协调器的 InstanceId,因此它可以重新启动。

[FunctionName(nameof(EmployeeApprovalReceived))]
public async Task EmployeeApprovalReceived(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "order-processer/approve-proceeding")] HttpRequestMessage req,
    [DurableClient] IDurableOrchestrationClient client
)
{
    var approval = await req.Content.ReadAsAsync<EmployeeApproval>();

    // First raise the event to the sub-orchestrator to stop waiting
    await client.RaiseEventAsync(approval.SubOrchestratorInstanceId, "EmployeeApproval");

    // Restart the sub-orchestrator with a new instance to keep track of previous attempts.
    await client.RestartAsync(approval.SubOrchestratorInstanceId, restartWithNewInstanceId: true);
}

如您所见,我首先引发事件 然后 我重新启动 Sub Orchestrator。这是因为 return 语句 必须 在任何重启发生之前执行。否则,将抛出一个异常,指出该实例不能处于 运行 状态。

问题

从事件引发的那一刻起,子协调者'completes'。结果是 main 协调器继续使用 next 子协调器。现在,这看起来很正常,因为子协调器已经“完成”了。但实际上它失败了。下一个子协调器将失败,因为 returned null 来自前一个。

失败的子协调器的重启工作正常,但没有任何价值,因为 下一个 子协调器已经启动。

我想阻止它进入下一个子协调器,但是我不能使用内置的重试机制,因为我需要先批准重试。


问题

如何防止主协调器在失败的子协调器完成重试之前继续?什么是持久功能方面的“最佳实践”?

像这样的东西可以工作。 使用循环,我们可以在获得批准后重试任务。 我还添加了一个等待两个事件并查看接收到哪个事件的示例。 请注意此代码尚未经过测试:)

[FunctionName(nameof(SubOrchestrator_1))]
public async Task<ProcessedOrder> SubOrchestrator_1([OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var retry = false;
    ProcessedOrder processedResult;
    do
    {
        // (...)

        var input = new { Input = "Example" };
        processedResult = await context.CallActivityAsync<ProcessedOrder>("DoSomething", input);

        if (processedResult.HasFailed)
        {
            // (...)

            // Wait on an employee to investigate the issue
            var approvalTask = context.WaitForExternalEvent("EmployeeApproval");
            var terminateTask = context.WaitForExternalEvent("EmployeeTerminate");
            var winnerTask = await Task.WhenAny(approvalTask, terminateTask);

            if (winnerTask == terminateTask)
            {
                // Return something to main orchestrator that indicates termination
                return terminatedResult;
            }
            
            retry = true;
        }
    } while (retry);
    
    // Processing succesful.
    return processedResult;
}