外循环被多次调用,而在 Durable Function 中使用 Nested Loop 调用 TaskList 以实现扇出方法
Outer loop is being called more than once, while Calling TaskList in Durable Function with Nested Loop for fan-out approach
这是我的“Orchestrator”的简化代码:
[FunctionName("Orchestrator")]
public static async Task<bool> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
{
List<string> hashKeyList = new List<string>();
List<Task<CalcResult>> tasksList = new List<Task<CalcResult>>();
Dictionary<A, B> otherCalc = new Dictionary<A, B>();
List<FinalResult> finalResults = new List<FinalResult>();
foreach (var a in A)//LOOP A
{
foreach (var b in B)//LOOP B
{
var hashKey = doSomeThingOnAandB(a, b);
if (!hashKeyList.Contains(hashKey))
{
hashkeyList.Add(hashKey);
tasksList.Add(context.CallActivityAsync<CalcResult>
(nameof(CalculationActivity), (a, b)));
}
else
{
otherCalc.Add(a, b);
}
}
await Task.WhenAll(tasksList);
foreach (var task in tasksList)
{
var res = ((Task<caclResult>)task).Result;
finalResults.Add(res);
}
}
}
我正在尝试为我的持久函数创建一个嵌套循环。所有示例始终针对一个循环,在我的例子中,内部循环 (B) 工作正常。但是,外循环(A)重复了不止一次!
在我的场景中:
在某些情况下,a
和 b
组合是相等的。我正在创建一个“hashkey”,对对象 a
和 b
进行一些小的计算,如果已经提到组合,我会将它们插入 otherCalc
字典,否则它将被添加到 taskList
.
发生了什么:
所有 a
和 b
都插入到 otherCalc
。 “循环 A”重新启动,尽管 hashKeyList
保持其内容。这意味着不管之前运行的外循环(循环 A),它已经填充了 hashKeyList
,我看到所有 A 列表都被再次调用。
据我从代码中收集到的信息,您的外循环不应该一遍又一遍地重新启动,但您永远不会在读取结果后从 taskList 中删除已完成的任务:
foreach (var task in tasksList)
{
var res = ((Task<caclResult>)task).Result; //should be "CalcResult"?
finalResults.Add(res);
}
我怀疑你不见了
tasksList.Clear();
在此 foreach 循环之后或外循环开始时。
或者,您是否有特定原因必须在 for 循环中读取任务结果?为什么不在外循环完成后等待所有添加的任务呢?您不仅避免多次读取结果,而且我怀疑首先创建所有任务然后等待它们应该更快,因为您的线程不必多次等待最后一个线程,而是已经处理下一个任务。
outer foreach
{
inner foreach
{
...
}
...
}
await Task.WhenAll(tasksList);
foreach (var task in tasksList)
{
var res = ((Task<caclResult>)task).Result;
finalResults.Add(res);
}
当然,这个问题有不同的解决方案。我发现最好的一个是使用“Sub Orchestrator”将循环分成两部分。
[FunctionName("Orchestrator")]
public static async Task<bool> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
{
List<string> hashKeyList = new List<string>();
foreach (var a in A)//LOOP A
{
//CALL SUB ORCHESTRATOR
hashKeyList = await context.CallSubOrchestratorAsync<List<string>>
(nameof(SecondOrchestrator), (hashKeyList , a));
}
}
Sub Orchestrator 代码为:
[FunctionName("SecondOrchestrator")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
{
(List<string> HashkeyIdList,A a)
= context.GetInput<(List<string>, A)>();
List<Task<CalcResult>> tasksList = new List<Task<CalcResult>>();
Dictionary<A, B> otherCalc = new Dictionary<A, B>();
List<FinalResult> finalResults = new List<FinalResult>();
foreach (var b in B)//LOOP B
{
var hashKey = doSomeThingOnAandB(a, b);
if (!hashKeyList.Contains(hashKey))
{
hashkeyList.Add(hashKey);
tasksList.Add(context.CallActivityAsync<CalcResult>
(nameof(CalculationActivity), (a, b)));
}
else
{
otherCalc.Add(a, b);
}
}
await Task.WhenAll(tasksList);
foreach (var task in tasksList)
{
var res = ((Task<caclResult>)task).Result;
//DO WHAT IS NEEDED WITH FINALRESULTS OR RETURN TO MAIN ORCHESTRATOR
finalResults.Add(res);
}
return HashkeyIdList;
}
}
这是我的“Orchestrator”的简化代码:
[FunctionName("Orchestrator")]
public static async Task<bool> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
{
List<string> hashKeyList = new List<string>();
List<Task<CalcResult>> tasksList = new List<Task<CalcResult>>();
Dictionary<A, B> otherCalc = new Dictionary<A, B>();
List<FinalResult> finalResults = new List<FinalResult>();
foreach (var a in A)//LOOP A
{
foreach (var b in B)//LOOP B
{
var hashKey = doSomeThingOnAandB(a, b);
if (!hashKeyList.Contains(hashKey))
{
hashkeyList.Add(hashKey);
tasksList.Add(context.CallActivityAsync<CalcResult>
(nameof(CalculationActivity), (a, b)));
}
else
{
otherCalc.Add(a, b);
}
}
await Task.WhenAll(tasksList);
foreach (var task in tasksList)
{
var res = ((Task<caclResult>)task).Result;
finalResults.Add(res);
}
}
}
我正在尝试为我的持久函数创建一个嵌套循环。所有示例始终针对一个循环,在我的例子中,内部循环 (B) 工作正常。但是,外循环(A)重复了不止一次!
在我的场景中:
在某些情况下,a
和 b
组合是相等的。我正在创建一个“hashkey”,对对象 a
和 b
进行一些小的计算,如果已经提到组合,我会将它们插入 otherCalc
字典,否则它将被添加到 taskList
.
发生了什么:
所有 a
和 b
都插入到 otherCalc
。 “循环 A”重新启动,尽管 hashKeyList
保持其内容。这意味着不管之前运行的外循环(循环 A),它已经填充了 hashKeyList
,我看到所有 A 列表都被再次调用。
据我从代码中收集到的信息,您的外循环不应该一遍又一遍地重新启动,但您永远不会在读取结果后从 taskList 中删除已完成的任务:
foreach (var task in tasksList)
{
var res = ((Task<caclResult>)task).Result; //should be "CalcResult"?
finalResults.Add(res);
}
我怀疑你不见了
tasksList.Clear();
在此 foreach 循环之后或外循环开始时。
或者,您是否有特定原因必须在 for 循环中读取任务结果?为什么不在外循环完成后等待所有添加的任务呢?您不仅避免多次读取结果,而且我怀疑首先创建所有任务然后等待它们应该更快,因为您的线程不必多次等待最后一个线程,而是已经处理下一个任务。
outer foreach
{
inner foreach
{
...
}
...
}
await Task.WhenAll(tasksList);
foreach (var task in tasksList)
{
var res = ((Task<caclResult>)task).Result;
finalResults.Add(res);
}
当然,这个问题有不同的解决方案。我发现最好的一个是使用“Sub Orchestrator”将循环分成两部分。
[FunctionName("Orchestrator")]
public static async Task<bool> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
{
List<string> hashKeyList = new List<string>();
foreach (var a in A)//LOOP A
{
//CALL SUB ORCHESTRATOR
hashKeyList = await context.CallSubOrchestratorAsync<List<string>>
(nameof(SecondOrchestrator), (hashKeyList , a));
}
}
Sub Orchestrator 代码为:
[FunctionName("SecondOrchestrator")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger logger)
{
(List<string> HashkeyIdList,A a)
= context.GetInput<(List<string>, A)>();
List<Task<CalcResult>> tasksList = new List<Task<CalcResult>>();
Dictionary<A, B> otherCalc = new Dictionary<A, B>();
List<FinalResult> finalResults = new List<FinalResult>();
foreach (var b in B)//LOOP B
{
var hashKey = doSomeThingOnAandB(a, b);
if (!hashKeyList.Contains(hashKey))
{
hashkeyList.Add(hashKey);
tasksList.Add(context.CallActivityAsync<CalcResult>
(nameof(CalculationActivity), (a, b)));
}
else
{
otherCalc.Add(a, b);
}
}
await Task.WhenAll(tasksList);
foreach (var task in tasksList)
{
var res = ((Task<caclResult>)task).Result;
//DO WHAT IS NEEDED WITH FINALRESULTS OR RETURN TO MAIN ORCHESTRATOR
finalResults.Add(res);
}
return HashkeyIdList;
}
}