如何在 asp.net 中启用并发搜索远程目录

How to enable concurrency in asp.net for searching remote directories

我需要在大量网络共享中搜索 MVC.net 应用程序中的给定文件集。这样做连续有效,但速度很慢。

我可以在控制台应用程序中使用 Parallel.ForEach,它似乎运行良好,但 Parallel.ForEach 似乎在 Mvc.Net 中不起作用,建议使用 async/await我能说的。

    static void SearchAll()
    {
        var shares = new[] { @"\share1\dir1", @"\share2\dir2", @"\share3\dir5" };
        var lookfor = new[] { "file.txt", "file2.txt", "file3.jpg", "file4.xml", "file5.zip" };
        var paths = new List<string>();
        var sw = System.Diagnostics.Stopwatch.StartNew();
        foreach(var share in shares)
        {
            var found = Search(share, lookfor);
            paths.AddRange(found);
        }
        Console.WriteLine($"Found {paths.Count} files in {sw.Elapsed}");
    }

    static List<string> Search(string share, IEnumerable<string> files)
    {
        List<string> found = new List<string>();
        foreach(var filename in files)
        {
            var path = Path.Combine(share, filename);
            if (File.Exists(path))
            {
                found.Add(path);
            }
        }
        return found;
    }

我希望能够使用 async/await 在 MVC.NET Controller Action 中搜索目录,但还没有成功。由于 EnumerateFilesAsync 没有 File.ExistsAsync,我不确定包装这些同步调用以启用搜索多个目录的最佳方法。由于 network/IO 方面的限制,此问题似乎适合 async/await。

Since there is no File.ExistsAsync for EnumerateFilesAsync, I'm not sure the best way to wrap those synchronous calls to enable searching multiple directories. Seems like this problem is suited for async/await due to network/IO bound aspect.

不幸的是,是的。这些是 I/O-based 操作并且 应该 具有异步 APIs,但是 Win32 API 不支持这些类型的目录操作的异步。奇怪的是,设备驱动层确实(甚至对于本地磁盘),所以所有的底层支持都在那里;我们就是做不到。

Parallel.ForEach 应该在 ASP.NET 上工作;只是不推荐。这是因为它会干扰 ASP.NET 线程池试探法。例如,如果您执行大型 Parallel 操作,由于线程池耗尽,其他传入请求可能需要等待更长时间才能处理。对此有一些缓解措施,比如将线程池线程的最小数量设置为默认值加上你的 MaxDegreeOfParallelism 是什么(并确保一次只有一个 Parallel)。或者您可以将文件枚举分解为单独的(私有)API 调用,以便它存在于同一服务器上自己的 AppDomain 中,具有自己的单独线程池。

如果您经常查询,而网络共享不经常更新,您可以通过在网络共享中保留所有文件名的内存镜像来以内存换取速度,并查询此镜像而不是文件系统。您将需要多个 FileSystemWatcher 对象,每个网络共享一个。每次通知到达时都会产生一个任务,以枚举已更改目录的文件。通过这种方式,您可以获得 100 倍或更多的性能提升。

这是一个实现:

public class RemoteWatcher : IDisposable
{
    private readonly DirectoryData[] _ddArray;
    private readonly Task[] _initializingTasks;

    public RemoteWatcher(string[] shares)
    {
        _ddArray = shares.Select(path =>
        {
            var dd = new DirectoryData();
            dd.Path = path;
            dd.Watcher = new FileSystemWatcher(path);
            dd.Watcher.EnableRaisingEvents = true;
            dd.Watcher.Created += (s, e) => OnChangedAsync(path);
            dd.Watcher.Renamed += (s, e) => OnChangedAsync(path);
            dd.Watcher.Changed += (s, e) => OnChangedAsync(path);
            dd.Watcher.Deleted += (s, e) => OnChangedAsync(path);
            dd.Watcher.Error += (s, e) => OnChangedAsync(path);
            dd.InProgress = true;
            return dd;
        }).ToArray();
        // Start processing all directories in parallel
        _initializingTasks = shares.Select(ProcessDirectoryAsync).ToArray();
    }

    private DirectoryData GetDirectoryData(string path)
    {
        return _ddArray.First(dd => dd.Path == path);
    }

    private async void OnChangedAsync(string path)
    {
        var dd = GetDirectoryData(path);
        Task delayTask;
        lock (dd)
        {
            dd.Cts?.Cancel();
            dd.Cts = new CancellationTokenSource();
            delayTask = Task.Delay(200, dd.Cts.Token);
        }
        try
        {
            // Workaround for changes firing twice
            await delayTask.ConfigureAwait(false);
        }
        catch (OperationCanceledException) // A new change occured
        {
            return; // Let the new event continue
        }
        lock (dd)
        {
            if (dd.InProgress)
            {
                dd.HasChanged = true; // Let it finish and mark for restart
                return;
            }
        }
        // Start processing
        var fireAndForget = ProcessDirectoryAsync(path);
    }

    private Task ProcessDirectoryAsync(string path)
    {
        return Task.Run(() =>
        {
            var dd = GetDirectoryData(path);
            var fileNames = Directory.EnumerateFiles(path).Select(Path.GetFileName);
            var hash = new HashSet<string>(fileNames, StringComparer.OrdinalIgnoreCase);
            lock (dd)
            {
                dd.FileNames = hash; // It is backed by a volatile field
                dd.InProgress = false;
                if (dd.HasChanged)
                {
                    dd.HasChanged = false;
                    var fireAndForget = ProcessDirectoryAsync(path); // Restart
                }
            }
        });
    }

    public async Task<string[]> SearchAllAsync(params string[] fileNames)
    {
        await Task.WhenAll(_initializingTasks);
        return _ddArray.SelectMany(dd =>
            fileNames.Where(f => dd.FileNames.Contains(f))
            .Select(fileName => Path.Combine(dd.Path, fileName))
        ).ToArray();
    }

    public void Dispose()
    {
        foreach (var dd in _ddArray) dd.Watcher.Dispose();
    }

    private class DirectoryData
    {
        public string Path { get; set; }
        public FileSystemWatcher Watcher { get; set; }
        public bool HasChanged { get; set; }
        public bool InProgress { get; set; }
        private volatile HashSet<string> _fileNames;
        public HashSet<string> FileNames
        {
            get => _fileNames; set => _fileNames = value;
        }
        public CancellationTokenSource Cts { get; set; }
    }
}

用法示例:

public static RemoteWatcher RemoteWatcher1 {get; private set;}

// On application start
RemoteWatcher1 = new RemoteWatcher(new[] { @"\share1\dir1", @"\share2\dir2", @"\share3\dir5" });

// Search
var results = RemoteWatcher1.SearchAllAsync(new[] { "file.txt", "file2.txt", "file3.jpg", "file4.xml", "file5.zip" }).Result;

// On application end
RemoteWatcher1.Dispose();

首次搜索将推迟到处理完所有网络共享后进行。文件系统更改不会阻止后续搜索。结果可能有点陈旧,因为使用了以前的镜像数据,直到当前处理完成。