从 C# 中已有的 运行 任务中初始化新任务

Initializing a new Task from within an already running Task in C#

我正在编写一个程序,它可以搜索计算机的整个文件系统,以销毁任何符合特定参数的文件。我希望程序尽可能快地 运行 并利用尽可能多的资源来实现这一点(值得注意的是,在此过程进行期间,用户不应完成任何其他工作)。为此,我编写了一个方法,它获取一个目标目录,搜索其中的所有文件,然后为每个 child 目录排队一个新任务。目前这是通过将目录的路径传递到队列中来完成的,主线程监视该队列并使用它来实际初始化新任务,如下所示:

static class DriveHandler
{
    internal static readonly List<string> fixedDrives = GetFixedDrives();

    private static readonly ConcurrentQueue<string> _targetPathQueue = new ConcurrentQueue<string>();
    private static int _threadCounter = 0;

    internal static void WipeDrives()
    {
        foreach (string driveLetter in fixedDrives)
        {
            Interlocked.Increment(ref _threadCounter);
            Task.Run(() => WalkDrive(driveLetter));
        }
        while (Volatile.Read(ref _threadCounter) > 0 || !_targetPathQueue.IsEmpty)
        {
            if (_targetPathQueue.TryDequeue(out string path))
            {
                Interlocked.Increment(ref _threadCounter);
                Task.Run(() => WalkDrive(path));
            }
        }
    }

    private static void WalkDrive(string directory)
    {
        foreach (string file in Directory.GetFiles(directory))
        {
            //If file meets conditions, delete
        }
        string[] subDirectories = Directory.GetDirectories(directory);
        if (subDirectories.Length != 0)
        {
            foreach (string subDirectory in subDirectories)
            {
                _targetPathQueue.Enqueue(subDirectory);
            }
        }
        else { } //do other stuff;
        Interlocked.Decrement(ref _threadCounter);
    }
}

我的问题是,是否 safe/worth 只是从已经 运行ning 的任务中初始化新任务以避免浪费处理器时间监视队列?看起来像这样的东西:

static class DriveHandler
{
    internal static readonly List<string> fixedDrives = GetFixedDrives();

    private static int _threadCounter = 0;

    internal static void WipeDrives()
    {
        foreach (string driveLetter in fixedDrives)
        {
            Interlocked.Increment(ref _threadCounter);
            Task.Run(() => WalkDrive(driveLetter));
        }
        while (Volatile.Read(ref _threadCounter) > 0)
        {
            Thread.Sleep(5000);
        }
    }

    private static void WalkDrive(string directory)
    {
        foreach (string file in Directory.GetFiles(directory))
        {
            //If file meets conditions, delete
        }
        string[] subDirectories = Directory.GetDirectories(directory);
        if (subDirectories.Length != 0)
        {
            foreach (string subDirectory in subDirectories)
            {
                Interlocked.Increment(ref _threadCounter);
                Task.Run(() => WalkDrive(path));
            }
        }
        else { } //do other stuff;
        Interlocked.Decrement(ref _threadCounter);
    }
}

我当然需要每个任务一旦完成就死掉,这样做会使旧任务 parents 变成新任务并让它们存活直到所有 children 都完成?

非常感谢!

第一个问题:

Task.Run(() => WalkDrive(path));

这是一种一劳永逸的时尚,在这种情况下做这件事不是一件好事,为什么?因为有可能,你在硬盘上拥有的文件和路径比机器拥有的 CPU 和内存容量要多(任务消耗的内存也不仅仅是 CPU)。发射后忘记,因此得名,您不断产生任务而没有 awaiting 它们。

My question is, is it safe/worth it to just initialize the new tasks from within the already running tasks to avoid wasting processor time monitoring the queue?

这是有效的,没有什么可以阻止你这样做,但你已经在浪费资源了,为什么每次都产生新任务?您已经有一个 运行,只需将其设为一个长 运行 后台任务并保留它 运行,只需两个线程(我假设一个是(UI/user 面向)线程) 和一个做这项工作。 所有这些锁和任务产生都会损害您的性能并浪费所有资源 CPU + 内存分配。

如果你想通过并行执行来加速你可以把路径加到并发队列中,并且只有10-100个并发任务MAX之类的,至少你有一个上限,你控制多少代码并行执行。

虽然conccurent-queue不为空且没有人请求取消操作:

  1. 从基本路径开始
  2. 获取所有 sub-paths 并将它们排入 concurrent-queue
  3. 处理该路径中的文件
  4. 使当前基本路径成为队列中的下一个可用项目
  5. 重新开始。

您只需启动最大并发任务数即可。

您的主要 loop/while 条件类似于:

private async Task StartAsync(CancellationToken cancellationToken)
{
    var tasks = new List<Task>();

    for (int i = 0; i < MaxConcurrentTasks; i++)
    {
        tasks.Add(Task.Run(() => ProcessPath(initialPathHere), cancellationToken));
    }

    await Task.WhenAll(tasks);
}

然后沿着这些线:

private static async Task ProcessPath(string path, CancellationToken cancellationToken)
{
    while(concurrentDictionary.Count > 0 && !cancellationToken.IsCancellationRequested)
    {
        foreach(var subPath in System.IO.Directory.EnumerateDirectories(path))
        {
            //Enqueue the subPath into the concurrent dictionary
        }

        //Once finished, process files in the current path
        
        foreach (var file in path)
        {
        }

        path = concurrentDictionary.Dequeue();
    }
}

还没有检查过语法,但我认为这是一个好的算法应该做的。此外,请记住,当任务完成其当前作业时,此行中的队列可能为空,因此请相应地修改该代码。

path = concurrentDictionary.Dequeue();

最后的笔记:

  1. 考虑任务与并行之间的权衡。Invok/execute
  2. 考虑使用 BackgroundServices 它们 fine-tuned 很长 运行,取决于您的代码和要求
  3. 为了提高性能,请记住黄金法则 measure early。从测量开始,手头有一些指标,所以如果你想加快速度,你至少知道你现在能做多少,所以你重构并再次测量并比较,然后你就会知道你是否离你的目标越来越近或越来越远。
  4. 确保你 conccurency/paralell 处理得当,否则它只会对你不利。