Azure Durable Functions 将扇出与函数链相结合

Azure Durable Functions Combining fan out with function chaining

我有一个持久函数应用程序,用于根据文件名 属性 以不同方式处理提交的项目。 Orchestrator 函数类似于下面的函数,尽管下面是一个简化的示例来说明我的场景。

基本上,我根据用户提交数据中文件名 属性 的扩展名进行不同的函数链接。

             InputData inputData = context.GetInput<InputData>();
        List<OutputData1> outputData1List = new List<OutputData1>();
        List<OutputData2> outputData2List = new List<OutputData2>();
        if (Path.GetExtension(inputData.FileName) == ".exa")
        {
            Func1Data func1Data = new Func1Data(inputData);
            outputData1List.Add(await context.CallActivityAsync<OutputData1>("Function1", func1Data));

            Func2Data func2Data = new Func2Data(inputData);
            outputData1List.Add(await context.CallActivityAsync<OutputData1>("Function2", func2Data));

            Func3Data func3Data = new Func3Data(inputData);
            outputData2List.Add(await context.CallActivityAsync<OutputData2>("Function3", func3Data));
        }
        if (Path.GetExtension(inputData.FileName) == ".exb")
        {
            Func2Data func2Data = new Func2Data(inputData);
            outputData1List.Add(await context.CallActivityAsync<OutputData1>("Function2", func2Data));

            Func3Data func3Data = new Func3Data(inputData);
            outputData2List.Add(await context.CallActivityAsync<OutputData2>("Function3", func3Data));
        }
        if (Path.GetExtension(inputData.FileName) == ".exc")
        {
            Func3Data func3Data = new Func3Data(inputData);
            outputData2List.Add(await context.CallActivityAsync<OutputData2>("Function3", func3Data));
        }
        if (Path.GetExtension(inputData.FileName) == ".exd")
        {
            Func4Data func4Data = new Func4Data(inputData);
            outputData2List.Add(await context.CallActivityAsync<OutputData2>("Function4", func4Data));
        }
        return new { output1 = outputData1List, output2 = outputData2List };

这适用于单个文件,但我现在还想向我的函数 App 添加第二个客户端函数,它接受一批包含文件输入数组的文件,并以相同的方式但同时处理它们.现在为了让批处理中的每个文件并发处理,我需要扇out/In。但我不确定如何在每个文件类型的基础上维护函数链接的同时实现这一点。下面让我更进一步,但我怀疑有更好的方法来实现这一目标。

        public static async Task<Object> RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
    {
        List<InputData> batchData = context.GetInput<List<InputData>>();
        List<OutputData1> outputData1List = new List<OutputData1>();
       // List<OutputData2> outputData2List = new List<OutputData2>();

        var concurrentTasks = new List<Task<outputData2>>();

        foreach (InputData inputData in batchData)
        {
            if (Path.GetExtension(inputData.FileName) == ".exa")
            {
                Func1Data func1Data = new Func1Data(inputData);
                outputData1List.Add(await context.CallActivityAsync<OutputData1>("Function1", func1Data));

                Func2Data func2Data = new Func2Data(inputData);
                outputData1List.Add(await context.CallActivityAsync<OutputData1>("Function2", func2Data));

                Func3Data func3Data = new Func3Data(inputData);
                concurrentTasks.Add(context.CallActivityAsync<OutputData2>("Function3", func3Data));
            }
            if (Path.GetExtension(inputData.FileName) == ".exb")
            {
                Func2Data func2Data = new Func2Data(inputData);
                outputData1List.Add(await context.CallActivityAsync<OutputData1>("Function2", func2Data));

                Func3Data func3Data = new Func3Data(inputData);
                concurrentTasks.Add(context.CallActivityAsync<OutputData2>("Function3", func3Data));
            }
            if (Path.GetExtension(inputData.FileName) == ".exc")
            {
                Func3Data func3Data = new Func3Data(inputData);
                concurrentTasks.Add(context.CallActivityAsync<OutputData2>("Function3", func3Data));
            }
            if (Path.GetExtension(inputData.FileName) == ".exd")
            {
                Func4Data func4Data = new Func4Data(inputData);
                concurrentTasks.Add(context.CallActivityAsync<OutputData2>("Function4", func4Data));
            }
        }
        var outputData2List = await Task.WhenAll(concurrentTasks);
        return new { output1 = outputData1List, output2 = outputData2List };
    }

我可以不按顺序处理每个链中的第一个函数,而是可以从每个函数链序列创建一个新任务,然后对其进行批处理吗?如果可以,谁能给我提供一个语法示例吗?

谢谢!

子业务流程有效。

    public static async Task<Object> BatchRunOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
    {
        List<InputData> batchData = context.GetInput<List<InputData>>();
        var concurrentTasks = new List<Task<Object>>();
        foreach (InputData inputData in batchData)
        {
            concurrentTasks.Add(context.CallSubOrchestratorAsync<BlobFileCombinedFunctionResults>("OtherOrchestration", inputData));
        }
        var results = await Task.WhenAll(concurrentTasks);
        return results;
    }