Azure Durable 功能在下载文件后从本地存储中删除文件

Azure Durable function removes files form local storage after it is downloaded

我为完成这项任务而苦苦挣扎。我必须从 SFTP 下载文件然后解析它们。我正在使用这样的持久函数

        [FunctionName("MainOrch")]
    public async Task<List<string>> RunOrchestrator(
         [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
    {
        try
        {
            var filesDownloaded = new List<string>();
            var filesUploaded = new List<string>();
            var files = await context.CallActivityAsync<List<string>>("SFTPGetListOfFiles", null);
            log.LogInformation("!!!!FilesFound*******!!!!!" + files.Count);
            if (files.Count > 0)
            {
                foreach (var fileName in files)
                {
                    filesDownloaded.Add(await context.CallActivityAsync<string>("SFTPDownload", fileName));
                }
      

                var parsingTasks = new List<Task<string>>(filesDownloaded.Count);
                foreach (var downlaoded in filesDownloaded)
                {
                    var parsingTask = context.CallActivityAsync<string>("PBARParsing", downlaoded);
                    parsingTasks.Add(parsingTask);
                }
                await Task.WhenAll(parsingTasks);
            }
            return filesDownloaded;
        }
        catch (Exception ex)
        {

            throw;
        }

    }

SFTPGetListOfFiles:此函数连接到 SFTP 并获取文件夹中的文件列表和 return。

SFTPDownload:此函数假设连接到 SFTP 并下载 Azure Function 的 Tempt Storage 中的每个文件。和 return 下载路径。 (每个文件从 10 到 60 MB)

        [FunctionName("SFTPDownload")]
    public async Task<string> SFTPDownload([ActivityTrigger] string name, ILogger log, Microsoft.Azure.WebJobs.ExecutionContext context)
    {
        var downloadPath = "";
        try
        {
            using (var session = new Session())
            {
                try
                {
                    session.ExecutablePath = Path.Combine(context.FunctionAppDirectory, "winscp.exe");
                    session.Open(GetOptions(context));
                    log.LogInformation("!!!!!!!!!!!!!!Connected For Download!!!!!!!!!!!!!!!");

                    TransferOptions transferOptions = new TransferOptions();
                    transferOptions.TransferMode = TransferMode.Binary;
                    downloadPath = Path.Combine(Path.GetTempPath(), name);
                    log.LogInformation("Downloading " + name);
                    var transferResult = session.GetFiles("/Receive/" + name, downloadPath, false, transferOptions);
                    log.LogInformation("Downloaded " + name);
                    // Throw on any error
                    transferResult.Check();
                    log.LogInformation("!!!!!!!!!!!!!!Completed Download !!!!!!!!!!!!!!!!");
                }
                catch (Exception ex)
                {
                    log.LogError(ex.Message);
                }
                finally
                {
                    session.Close();
                }
            }
        }
        catch (Exception ex)
        {
            log.LogError(ex.Message);
            _traceService.TraceException(ex);
        }
        return downloadPath;
    }

PBARParsing:函数必须获取该文件的流并对其进行处理(处理一个 60 MB 的文件可能需要几分钟时间来扩展 S2 并向外扩展 10 个实例。)

        [FunctionName("PBARParsing")]
    public async Task PBARParsing([ActivityTrigger] string pathOfFile,
    ILogger log)
    {

        var theSplit = pathOfFile.Split("\");
        var name = theSplit[theSplit.Length - 1];
        try
        {
            log.LogInformation("**********Starting" + name);
            Stream stream = File.OpenRead(pathOfFile);

我希望使用 SFTPDownload 完成所有文件的下载,这就是“等待”处于循环中的原因。然后我想并行解析为 运行。

问题 1:MainOrch 函数中的代码是否正确地执行了这 3 件事 1) 获取文件的名称,2) 一个接一个地下载它们并且在所有文件下载完成之前不启动解析功能。然后 3) 并行解析文件。 ?

我观察到我在问题 1 中提到的内容按预期工作。

问题 2:30% 的文件已被解析,对于 80% 的文件,我看到错误“找不到文件 'D:\local\Temp\fileName'”是 azure 函数在我放置文件后删除文件吗?我可以采取其他方法吗?如果我将路径更改为“D:\home”,我可能会看到“另一个进程正在使用文件”错误。但我还没有尝试过。在 SFTP 上奇怪地输出了 68 个文件,最后 20 个 运行 和前 40 个文件在该路径中找不到,这是按顺序排列的。

问题 3:我还看到此错误“Blob 'func-eres-integration-dev/host' 的单例锁续订失败,错误代码为 409:LeaseIdMismatchWithLeaseOperation。最后一次成功续订完成于 2020-08-08T17:57:10.494Z(46005 毫秒)之前),持续时间为 155 毫秒。租期为 15000 毫秒。”它说明了什么吗?不过只来过一次。

更新 使用“D:\home”后我没有收到文件未找到错误

对于遇到此问题的其他人,临时存储是函数应用实例的本地存储,当函数横向扩展时会有所不同。

对于这种情况,D:\home 是更好的选择,因为 Azure 文件安装在此处,这在所有实例中都是相同的。

至于这里观察到的锁更新错误,这个issue tracks it but shouldn't cause issues提到过。如果您确实因此而发现任何问题,最好在该问题中分享详细信息。