外循环被多次调用,而在 Durable Function 中使用 Nested Loop 调用 TaskList 以实现扇出方法

Outer loop is being called more than once, while Calling TaskList in Durable Function with Nested Loop for fan-out approach

这是我的“Orchestrator”的简化代码:

  [FunctionName("Orchestrator")]
    public static async Task<bool> RunOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
    {

        List<string> hashKeyList = new List<string>();
        List<Task<CalcResult>> tasksList = new List<Task<CalcResult>>();
        Dictionary<A, B> otherCalc = new Dictionary<A, B>();
        List<FinalResult> finalResults = new List<FinalResult>();

        foreach (var a in A)//LOOP A
        {
            foreach (var b in B)//LOOP B
            {
                var hashKey = doSomeThingOnAandB(a, b);
            if (!hashKeyList.Contains(hashKey))
                {
                    hashkeyList.Add(hashKey);
                    tasksList.Add(context.CallActivityAsync<CalcResult>
                        (nameof(CalculationActivity), (a, b)));
                }
            else
                {
                    otherCalc.Add(a, b);
                }
            }
            await Task.WhenAll(tasksList);

            foreach (var task in tasksList)
            {
                var res = ((Task<caclResult>)task).Result;

                finalResults.Add(res);
            }
        }
    }

我正在尝试为我的持久函数创建一个嵌套循环。所有示例始终针对一个循环,在我的例子中,内部循环 (B) 工作正常。但是,外循环(A)重复了不止一次!

在我的场景中: 在某些情况下,ab 组合是相等的。我正在创建一个“hashkey”,对对象 ab 进行一些小的计算,如果已经提到组合,我会将它们插入 otherCalc 字典,否则它将被添加到 taskList.

发生了什么: 所有 ab 都插入到 otherCalc。 “循环 A”重新启动,尽管 hashKeyList 保持其内容。这意味着不管之前运行的外循环(循环 A),它已经填充了 hashKeyList,我看到所有 A 列表都被再次调用。

据我从代码中收集到的信息,您的外循环不应该一遍又一遍地重新启动,但您永远不会在读取结果后从 taskList 中删除已完成的任务:

foreach (var task in tasksList)
{
    var res = ((Task<caclResult>)task).Result; //should be "CalcResult"?
    finalResults.Add(res);
}

我怀疑你不见了

tasksList.Clear();

在此 foreach 循环之后或外循环开始时。

或者,您是否有特定原因必须在 for 循环中读取任务结果?为什么不在外循环完成后等待所有添加的任务呢?您不仅避免多次读取结果,而且我怀疑首先创建所有任务然后等待它们应该更快,因为您的线程不必多次等待最后一个线程,而是已经处理下一个任务。

outer foreach
{
    inner foreach
    {
    ...
    }
...
}

await Task.WhenAll(tasksList);

foreach (var task in tasksList)
{
    var res = ((Task<caclResult>)task).Result;

    finalResults.Add(res);
}

当然,这个问题有不同的解决方案。我发现最好的一个是使用“Sub Orchestrator”将循环分成两部分。

[FunctionName("Orchestrator")]
public static async Task<bool> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
{

    List<string> hashKeyList = new List<string>();
  

    foreach (var a in A)//LOOP A
    {
       //CALL SUB ORCHESTRATOR

         hashKeyList = await context.CallSubOrchestratorAsync<List<string>>
                (nameof(SecondOrchestrator), (hashKeyList , a));
      }
   }

Sub Orchestrator 代码为:

 [FunctionName("SecondOrchestrator")]
    public static async Task<List<string>> RunOrchestrator(
       [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
    {
        (List<string> HashkeyIdList,A a)
         = context.GetInput<(List<string>, A)>();
        List<Task<CalcResult>> tasksList = new List<Task<CalcResult>>();
    Dictionary<A, B> otherCalc = new Dictionary<A, B>();
    List<FinalResult> finalResults = new List<FinalResult>();

     foreach (var b in B)//LOOP B
        {
            var hashKey = doSomeThingOnAandB(a, b);
        if (!hashKeyList.Contains(hashKey))
            {
                hashkeyList.Add(hashKey);
                tasksList.Add(context.CallActivityAsync<CalcResult>
                    (nameof(CalculationActivity), (a, b)));
            }
        else
            {
                otherCalc.Add(a, b);
            }
        }
        await Task.WhenAll(tasksList);

        foreach (var task in tasksList)
        {
            var res = ((Task<caclResult>)task).Result;
          //DO WHAT IS NEEDED WITH FINALRESULTS OR RETURN TO MAIN ORCHESTRATOR

            finalResults.Add(res); 
        }
      return HashkeyIdList;
    }
  }