您在编排方法中的何处执行 CallActivityAsync

Where do you do CallActivityAsync in orchestration method

我刚刚开始使用持久函数,需要一些关于如何正确执行扇出模式的建议。我有一个 FTP 服务器,我从那里读取所有文件。我想为每个文件启动一个 Activity 函数。据我了解,每次执行 Activity 函数时都会调用 orchestrator 函数。我只想读一次文件。为避免多次调用读取文件和启动 activity 函数的代码,推荐的方法是什么?它是否具有添加所有 activity 函数的 activity 函数,或者它是否使用 IsReplaying 属性,或其他不同的东西?

public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context)
    var outputs = new List<string>();

    if (!context.IsReplaying)
        // Do you call your database here and make a call to CallActivityAsync for each row?

    // doing it here is properly very wrong as it will be called multiple times
    var tasks = new Task<string>[7];
    for (int i = 0; i < 7; i++)
        tasks[i] = context.CallActivityAsync<string>("E2_CopyFileToBlob","");            }

    await Task.WhenAll(tasks);

    return outputs;

当查看下面 link 中的示例时,这实际上是直接在协调器函数中调用它吗?这不是很糟糕吗?它继续一次又一次地添加相同的活动.... ?


不确定我是否理解您想要实现的目标,但到目前为止您的代码看起来还不错。编排仅被调用一次(重放时可能会调用更多次,但这不是您的问题)。从您的编排中,您可以调用一个扇出所有 activity 函数(从 ftp 收集一个文件)每个 activity 函数一个文件。 await Task.WhenAll(tasks) 是你的粉丝。(你可以使用 List<Task> 而不是数组,如果你愿意,可以调用 .Add(task) 。为了不编辑你的代码,我将它复制到这里并添加一些评论和问题(请随时在此处编辑):

public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context)
    var outputs = new List<string>();

    if (!context.IsReplaying)
        // just needed for things that should not happen twice like logging....

    // if your work isn't a fixed list just call an activity 
    // which replies with the list of work here (e.g. list of filenames)

    var tasks = new Task<string>[7]; // can be a List<Task> too
    for (int i = 0; i < 7; i++)
        tasks[i] = context.CallActivityAsync<string>("E2_CopyFileToBlob","");

    await Task.WhenAll(tasks);

    return outputs; // currently an empty list. What do you want to give back?