您在编排方法中的何处执行 CallActivityAsync

Where do you do CallActivityAsync in orchestration method

我刚刚开始使用持久函数,需要一些关于如何正确执行扇出模式的建议。我有一个 FTP 服务器,我从那里读取所有文件。我想为每个文件启动一个 Activity 函数。据我了解,每次执行 Activity 函数时都会调用 orchestrator 函数。我只想读一次文件。为避免多次调用读取文件和启动 activity 函数的代码,推荐的方法是什么?它是否具有添加所有 activity 函数的 activity 函数,或者它是否使用 IsReplaying 属性,或其他不同的东西?

[FunctionName("OrchestrationMoveFilesToBlob")]
public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context)
{
    var outputs = new List<string>();

    if (!context.IsReplaying)
    {
        // Do you call your database here and make a call to CallActivityAsync for each row?
    }

    // doing it here is properly very wrong as it will be called multiple times
    var tasks = new Task<string>[7];
    for (int i = 0; i < 7; i++)
    {
        tasks[i] = context.CallActivityAsync<string>("E2_CopyFileToBlob","");            }

    await Task.WhenAll(tasks);

    return outputs;
}

当查看下面 link 中的示例时,这实际上是直接在协调器函数中调用它吗?这不是很糟糕吗?它继续一次又一次地添加相同的活动.... ?

https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-cloud-backup

不确定我是否理解您想要实现的目标,但到目前为止您的代码看起来还不错。编排仅被调用一次(重放时可能会调用更多次,但这不是您的问题)。从您的编排中,您可以调用一个扇出所有 activity 函数(从 ftp 收集一个文件)每个 activity 函数一个文件。 await Task.WhenAll(tasks) 是你的粉丝。(你可以使用 List<Task> 而不是数组,如果你愿意,可以调用 .Add(task) 。为了不编辑你的代码,我将它复制到这里并添加一些评论和问题(请随时在此处编辑):

[FunctionName("OrchestrationMoveFilesToBlob")]
public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context)
{
    var outputs = new List<string>();

    if (!context.IsReplaying)
    {
        // just needed for things that should not happen twice like logging....
    } 

    // if your work isn't a fixed list just call an activity 
    // which replies with the list of work here (e.g. list of filenames)

    var tasks = new Task<string>[7]; // can be a List<Task> too
    for (int i = 0; i < 7; i++)
    {
        tasks[i] = context.CallActivityAsync<string>("E2_CopyFileToBlob","");
    }

    await Task.WhenAll(tasks);

    return outputs; // currently an empty list. What do you want to give back?
}