您在编排方法中的何处执行 CallActivityAsync
Where do you do CallActivityAsync in orchestration method
我刚刚开始使用持久函数,需要一些关于如何正确执行扇出模式的建议。我有一个 FTP 服务器,我从那里读取所有文件。我想为每个文件启动一个 Activity 函数。据我了解,每次执行 Activity 函数时都会调用 orchestrator 函数。我只想读一次文件。为避免多次调用读取文件和启动 activity 函数的代码,推荐的方法是什么?它是否具有添加所有 activity 函数的 activity 函数,或者它是否使用 IsReplaying 属性,或其他不同的东西?
[FunctionName("OrchestrationMoveFilesToBlob")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
var outputs = new List<string>();
if (!context.IsReplaying)
{
// Do you call your database here and make a call to CallActivityAsync for each row?
}
// doing it here is properly very wrong as it will be called multiple times
var tasks = new Task<string>[7];
for (int i = 0; i < 7; i++)
{
tasks[i] = context.CallActivityAsync<string>("E2_CopyFileToBlob",""); }
await Task.WhenAll(tasks);
return outputs;
}
当查看下面 link 中的示例时,这实际上是直接在协调器函数中调用它吗?这不是很糟糕吗?它继续一次又一次地添加相同的活动.... ?
https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-cloud-backup
不确定我是否理解您想要实现的目标,但到目前为止您的代码看起来还不错。编排仅被调用一次(重放时可能会调用更多次,但这不是您的问题)。从您的编排中,您可以调用一个扇出所有 activity 函数(从 ftp 收集一个文件)每个 activity 函数一个文件。 await Task.WhenAll(tasks)
是你的粉丝。(你可以使用 List<Task>
而不是数组,如果你愿意,可以调用 .Add(task)
。为了不编辑你的代码,我将它复制到这里并添加一些评论和问题(请随时在此处编辑):
[FunctionName("OrchestrationMoveFilesToBlob")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
var outputs = new List<string>();
if (!context.IsReplaying)
{
// just needed for things that should not happen twice like logging....
}
// if your work isn't a fixed list just call an activity
// which replies with the list of work here (e.g. list of filenames)
var tasks = new Task<string>[7]; // can be a List<Task> too
for (int i = 0; i < 7; i++)
{
tasks[i] = context.CallActivityAsync<string>("E2_CopyFileToBlob","");
}
await Task.WhenAll(tasks);
return outputs; // currently an empty list. What do you want to give back?
}
我刚刚开始使用持久函数,需要一些关于如何正确执行扇出模式的建议。我有一个 FTP 服务器,我从那里读取所有文件。我想为每个文件启动一个 Activity 函数。据我了解,每次执行 Activity 函数时都会调用 orchestrator 函数。我只想读一次文件。为避免多次调用读取文件和启动 activity 函数的代码,推荐的方法是什么?它是否具有添加所有 activity 函数的 activity 函数,或者它是否使用 IsReplaying 属性,或其他不同的东西?
[FunctionName("OrchestrationMoveFilesToBlob")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
var outputs = new List<string>();
if (!context.IsReplaying)
{
// Do you call your database here and make a call to CallActivityAsync for each row?
}
// doing it here is properly very wrong as it will be called multiple times
var tasks = new Task<string>[7];
for (int i = 0; i < 7; i++)
{
tasks[i] = context.CallActivityAsync<string>("E2_CopyFileToBlob",""); }
await Task.WhenAll(tasks);
return outputs;
}
当查看下面 link 中的示例时,这实际上是直接在协调器函数中调用它吗?这不是很糟糕吗?它继续一次又一次地添加相同的活动.... ?
https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-cloud-backup
不确定我是否理解您想要实现的目标,但到目前为止您的代码看起来还不错。编排仅被调用一次(重放时可能会调用更多次,但这不是您的问题)。从您的编排中,您可以调用一个扇出所有 activity 函数(从 ftp 收集一个文件)每个 activity 函数一个文件。 await Task.WhenAll(tasks)
是你的粉丝。(你可以使用 List<Task>
而不是数组,如果你愿意,可以调用 .Add(task)
。为了不编辑你的代码,我将它复制到这里并添加一些评论和问题(请随时在此处编辑):
[FunctionName("OrchestrationMoveFilesToBlob")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
var outputs = new List<string>();
if (!context.IsReplaying)
{
// just needed for things that should not happen twice like logging....
}
// if your work isn't a fixed list just call an activity
// which replies with the list of work here (e.g. list of filenames)
var tasks = new Task<string>[7]; // can be a List<Task> too
for (int i = 0; i < 7; i++)
{
tasks[i] = context.CallActivityAsync<string>("E2_CopyFileToBlob","");
}
await Task.WhenAll(tasks);
return outputs; // currently an empty list. What do you want to give back?
}