我可以在持久函数中使用 foreach 循环吗?

Can I use a foreach loop in a durable function?

我已经编写了我的第一个持久函数,我想知道是否可以/好的做法在协调器函数中使用 foreach 循环?

编排中的第一个 activity returns 项目 ID 列表,我想遍历列表并使用子编排为每个项目 ID 执行一系列活动。

我创建了一个测试函数,它似乎可以工作。我观察到的唯一行为是,每次 orchestrator 重放并到达 foreach 循环时,它都会遍历整个列表,直到到达当前项目,然后执行活动。

如有任何建议/意见,我们将不胜感激。

谢谢

看看你的例子,这是非常标准的 Fan-out/Fan-in 案例。您可以 运行 并行活动的循环,但要确保您是异步进行的。您可以在此处找到用例和示例。

https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-concepts#fan-in-out

基于评论

这正是 Orchester 的工作方式。编排使用事件溯源模式。当 Orchestrator 安排 activity 时,它会进入睡眠状态,当 activity 完成时,它会醒来。每次 orchestrator 醒来时,它总是会从头开始重播,并会检查执行历史以查看它是否已经完成了给定的 activity 并继续前进。因此,在循环的情况下,它将安排所有活动并进入睡眠状态,当醒来时它会从头开始重放以查看它是否已完成其任务。我强烈建议您观看来自 Microsoft 的 Jeff hollan 的以下剪辑,我相信看完之后您会有非常清晰的想法。

How Orchestration works

只要编排中的代码是确定性的,就可以了。 docs.

中有关代码约束的更多信息

您提到您使用 activity 函数检索这些 ID。只要您使用相同的参数调用 functions/suborchestrations 就应该没问题,因为在回放期间 Durable Functions 会识别出该函数之前已被调用,并将 return 持久输出(因此不会 re-execute同样的功能)。

在持久函数中管理 foreach 的一个关键概念,无论是函数链接还是 Fan-in/Fan-out 是要迭代的数据是 return 从 编辑的Activity 并且每个数据项的处理也在 Activity.

的上下文中执行

This pattern will ensure that your logic is deterministic, don't rely on the NonDeterministicOrchestrationException as your proof that the logic is deterministic, that is commonly raised when a replay operation sends a different input than was expected and may not directly or initially inform you of non-deterministric logic.

任何对数据库或外部服务或其他 http 端点的调用都应被视为不确定的,因此将代码包装在 Activity 中。这样,当 Orchestrator 重放时,它将检索先前完成的对 activity 的调用的结果,形成底层存储。

  1. This can help improve performance if the logic in the activity is only evaluated once for the durable lifetime.
  2. This will also protect you from transient errors that may ocurr if during a replay attempt the underlying provider may be momentarily unavailable.

在下面的简单示例中,我们有一个需要在许多设施上执行的翻转功能,我们可以使用 fan-out 来同时执行每个设施的单独任务,或者按顺序链接:

[FunctionName("RolloverBot")]
public static async Task<bool> RolloverBot(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    // example of how to get data to iterate against in a determinist paradigm
    var facilityIds = await context.CallActivityAsync<int[]>(nameof(GetAllFacilities), null);

    #region Fan-Out
    var tasks = new List<Task>();
    foreach (var facilityId in facilityIds)
    {
        tasks.Add(context.CallActivityAsync(nameof(RolloverFacility), facilityId));
    }
    
    // Fan back in ;)
    await Task.WhenAll(tasks);
    #endregion Fan-Out
  
    #region Chaining / Iterating Sequentially

    foreach (var facilityId in facilityIds)
    {
        await context.CallActivityAsync(nameof(RolloverFacility), facilityId);
    }

    #endregion Chaining / Iterating Sequentially

    return true;
}

/// <summary>
/// Return a list of all FacilityIds to operate on
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
[FunctionName("GetAllFacilities")]
public static async Task<int[]> GetAllFacilities([ActivityTrigger] IDurableActivityContext context)
{
    var db = await Globals.GetDataContext();
    var data = await db.Facilities.AddQueryOption("$select", "Id").ExecuteAsync();
    return data.Where(x => x.Id.HasValue).Distinct().Select(x => x.Id.Value).ToArray();
}

[FunctionName("RolloverFacility")]
public static async Task<bool> RolloverFacility(
    [ActivityTrigger] IDurableActivityContext context)
{
    int facilityId = context.GetInput<int>();
    bool result = false;

    ... insert rollover logic here

    result = true;
    return result;
}

这样,即使您的Activity逻辑使用System.RandomGuid.CreateNewDateTimeOffset.Now来确定facilityIds 到 return,持久函数本身仍被视为 确定性 并将正确重播。

作为一项规则,如果您的 activity 逻辑是时间依赖性,因为它使逻辑更加明显,即 Orchestrator 实际上在控制任务,而不是相反,而且由于 调度 [=40= 之间的函数实现,可能会有微小的滞后时间] CallActivityAsync 及其实际 执行