如何扩展天蓝色持久功能
how to scale azure durable funciton
所以我使用 Durable 函数完成 2 个任务。
我从 sftp 下载文件并将它们上传到 blob。我继续在字符串列表中添加这些文件的名称。
然后我将这个列表传递给另一个必须对这些文件执行计算的函数。
[FunctionName("MainOrch")]
public async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var fileUploaded = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
fileUploaded = await context.CallActivityAsync<List<string>>("SFTPDownloadAndUpload", null);
foreach (var fileName in fileUploaded)
{
await context.CallActivityAsync("PBARParsing", fileName);
}
return fileUploaded;
}
进行计算的解析函数是这样设置的
[FunctionName("PBARParsing")]
public async Task PBARParsing([ActivityTrigger] string name,
[Blob("pbar-staging/{name}", FileAccess.Read, Connection = "pbarBlobConnectionVault")] Stream myBlob,
ILogger log)
{
try
{
log.LogInformation("**********Starting" + name);
我的问题是它是否会扩展解析函数我的意思是如果给这个函数 10 个文件它会 运行 10 个解析函数实例每个文件一个还是我必须做点别的事吗?
在你现在的代码中,解析activity会运行一个接一个的串联。
这是因为您在循环内等待 activity 任务。
如果你想 运行 所有这些并行(fan-out fan-in 模式),你需要收集任务并立即等待它们:
[FunctionName("MainOrch")]
public async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var fileUploaded = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
fileUploaded = await context.CallActivityAsync<List<string>>("SFTPDownloadAndUpload", null);
var parseTasks = new List<Task>(fileUploaded.Count);
foreach (var fileName in fileUploaded)
{
var parseTask = context.CallActivityAsync("PBARParsing", fileName);
parseTasks.Add(parseTask);
}
await Task.WhenAll(parseTasks);
return fileUploaded;
}
所以我使用 Durable 函数完成 2 个任务。
我从 sftp 下载文件并将它们上传到 blob。我继续在字符串列表中添加这些文件的名称。
然后我将这个列表传递给另一个必须对这些文件执行计算的函数。
[FunctionName("MainOrch")] public async Task<List<string>> RunOrchestrator( [OrchestrationTrigger] IDurableOrchestrationContext context) { var fileUploaded = new List<string>(); // Replace "hello" with the name of your Durable Activity Function. fileUploaded = await context.CallActivityAsync<List<string>>("SFTPDownloadAndUpload", null); foreach (var fileName in fileUploaded) { await context.CallActivityAsync("PBARParsing", fileName); } return fileUploaded; }
进行计算的解析函数是这样设置的
[FunctionName("PBARParsing")]
public async Task PBARParsing([ActivityTrigger] string name,
[Blob("pbar-staging/{name}", FileAccess.Read, Connection = "pbarBlobConnectionVault")] Stream myBlob,
ILogger log)
{
try
{
log.LogInformation("**********Starting" + name);
我的问题是它是否会扩展解析函数我的意思是如果给这个函数 10 个文件它会 运行 10 个解析函数实例每个文件一个还是我必须做点别的事吗?
在你现在的代码中,解析activity会运行一个接一个的串联。 这是因为您在循环内等待 activity 任务。 如果你想 运行 所有这些并行(fan-out fan-in 模式),你需要收集任务并立即等待它们:
[FunctionName("MainOrch")]
public async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var fileUploaded = new List<string>();
// Replace "hello" with the name of your Durable Activity Function.
fileUploaded = await context.CallActivityAsync<List<string>>("SFTPDownloadAndUpload", null);
var parseTasks = new List<Task>(fileUploaded.Count);
foreach (var fileName in fileUploaded)
{
var parseTask = context.CallActivityAsync("PBARParsing", fileName);
parseTasks.Add(parseTask);
}
await Task.WhenAll(parseTasks);
return fileUploaded;
}