从 C# 中已有的 运行 任务中初始化新任务
Initializing a new Task from within an already running Task in C#
我正在编写一个程序,它可以搜索计算机的整个文件系统,以销毁任何符合特定参数的文件。我希望程序尽可能快地 运行 并利用尽可能多的资源来实现这一点(值得注意的是,在此过程进行期间,用户不应完成任何其他工作)。为此,我编写了一个方法,它获取一个目标目录,搜索其中的所有文件,然后为每个 child 目录排队一个新任务。目前这是通过将目录的路径传递到队列中来完成的,主线程监视该队列并使用它来实际初始化新任务,如下所示:
static class DriveHandler
{
internal static readonly List<string> fixedDrives = GetFixedDrives();
private static readonly ConcurrentQueue<string> _targetPathQueue = new ConcurrentQueue<string>();
private static int _threadCounter = 0;
internal static void WipeDrives()
{
foreach (string driveLetter in fixedDrives)
{
Interlocked.Increment(ref _threadCounter);
Task.Run(() => WalkDrive(driveLetter));
}
while (Volatile.Read(ref _threadCounter) > 0 || !_targetPathQueue.IsEmpty)
{
if (_targetPathQueue.TryDequeue(out string path))
{
Interlocked.Increment(ref _threadCounter);
Task.Run(() => WalkDrive(path));
}
}
}
private static void WalkDrive(string directory)
{
foreach (string file in Directory.GetFiles(directory))
{
//If file meets conditions, delete
}
string[] subDirectories = Directory.GetDirectories(directory);
if (subDirectories.Length != 0)
{
foreach (string subDirectory in subDirectories)
{
_targetPathQueue.Enqueue(subDirectory);
}
}
else { } //do other stuff;
Interlocked.Decrement(ref _threadCounter);
}
}
我的问题是,是否 safe/worth 只是从已经 运行ning 的任务中初始化新任务以避免浪费处理器时间监视队列?看起来像这样的东西:
static class DriveHandler
{
internal static readonly List<string> fixedDrives = GetFixedDrives();
private static int _threadCounter = 0;
internal static void WipeDrives()
{
foreach (string driveLetter in fixedDrives)
{
Interlocked.Increment(ref _threadCounter);
Task.Run(() => WalkDrive(driveLetter));
}
while (Volatile.Read(ref _threadCounter) > 0)
{
Thread.Sleep(5000);
}
}
private static void WalkDrive(string directory)
{
foreach (string file in Directory.GetFiles(directory))
{
//If file meets conditions, delete
}
string[] subDirectories = Directory.GetDirectories(directory);
if (subDirectories.Length != 0)
{
foreach (string subDirectory in subDirectories)
{
Interlocked.Increment(ref _threadCounter);
Task.Run(() => WalkDrive(path));
}
}
else { } //do other stuff;
Interlocked.Decrement(ref _threadCounter);
}
}
我当然需要每个任务一旦完成就死掉,这样做会使旧任务 parents 变成新任务并让它们存活直到所有 children 都完成?
非常感谢!
第一个问题:
Task.Run(() => WalkDrive(path));
这是一种一劳永逸的时尚,在这种情况下做这件事不是一件好事,为什么?因为有可能,你在硬盘上拥有的文件和路径比机器拥有的 CPU 和内存容量要多(任务消耗的内存也不仅仅是 CPU)。发射后忘记,因此得名,您不断产生任务而没有 await
ing 它们。
My question is, is it safe/worth it to just initialize the new tasks from within the already running tasks to avoid wasting processor time monitoring the queue?
这是有效的,没有什么可以阻止你这样做,但你已经在浪费资源了,为什么每次都产生新任务?您已经有一个 运行,只需将其设为一个长 运行 后台任务并保留它 运行,只需两个线程(我假设一个是(UI/user 面向)线程) 和一个做这项工作。
所有这些锁和任务产生都会损害您的性能并浪费所有资源 CPU + 内存分配。
如果你想通过并行执行来加速你可以把路径加到并发队列中,并且只有10-100个并发任务MAX之类的,至少你有一个上限,你控制多少代码并行执行。
虽然conccurent-queue不为空且没有人请求取消操作:
- 从基本路径开始
- 获取所有 sub-paths 并将它们排入 concurrent-queue
- 处理该路径中的文件
- 使当前基本路径成为队列中的下一个可用项目
- 重新开始。
您只需启动最大并发任务数即可。
您的主要 loop/while 条件类似于:
private async Task StartAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
for (int i = 0; i < MaxConcurrentTasks; i++)
{
tasks.Add(Task.Run(() => ProcessPath(initialPathHere), cancellationToken));
}
await Task.WhenAll(tasks);
}
然后沿着这些线:
private static async Task ProcessPath(string path, CancellationToken cancellationToken)
{
while(concurrentDictionary.Count > 0 && !cancellationToken.IsCancellationRequested)
{
foreach(var subPath in System.IO.Directory.EnumerateDirectories(path))
{
//Enqueue the subPath into the concurrent dictionary
}
//Once finished, process files in the current path
foreach (var file in path)
{
}
path = concurrentDictionary.Dequeue();
}
}
还没有检查过语法,但我认为这是一个好的算法应该做的。此外,请记住,当任务完成其当前作业时,此行中的队列可能为空,因此请相应地修改该代码。
path = concurrentDictionary.Dequeue();
最后的笔记:
- 考虑任务与并行之间的权衡。Invok/execute
- 考虑使用
BackgroundServices
它们 fine-tuned 很长 运行,取决于您的代码和要求
- 为了提高性能,请记住黄金法则
measure early
。从测量开始,手头有一些指标,所以如果你想加快速度,你至少知道你现在能做多少,所以你重构并再次测量并比较,然后你就会知道你是否离你的目标越来越近或越来越远。
- 确保你 conccurency/paralell 处理得当,否则它只会对你不利。
我正在编写一个程序,它可以搜索计算机的整个文件系统,以销毁任何符合特定参数的文件。我希望程序尽可能快地 运行 并利用尽可能多的资源来实现这一点(值得注意的是,在此过程进行期间,用户不应完成任何其他工作)。为此,我编写了一个方法,它获取一个目标目录,搜索其中的所有文件,然后为每个 child 目录排队一个新任务。目前这是通过将目录的路径传递到队列中来完成的,主线程监视该队列并使用它来实际初始化新任务,如下所示:
static class DriveHandler
{
internal static readonly List<string> fixedDrives = GetFixedDrives();
private static readonly ConcurrentQueue<string> _targetPathQueue = new ConcurrentQueue<string>();
private static int _threadCounter = 0;
internal static void WipeDrives()
{
foreach (string driveLetter in fixedDrives)
{
Interlocked.Increment(ref _threadCounter);
Task.Run(() => WalkDrive(driveLetter));
}
while (Volatile.Read(ref _threadCounter) > 0 || !_targetPathQueue.IsEmpty)
{
if (_targetPathQueue.TryDequeue(out string path))
{
Interlocked.Increment(ref _threadCounter);
Task.Run(() => WalkDrive(path));
}
}
}
private static void WalkDrive(string directory)
{
foreach (string file in Directory.GetFiles(directory))
{
//If file meets conditions, delete
}
string[] subDirectories = Directory.GetDirectories(directory);
if (subDirectories.Length != 0)
{
foreach (string subDirectory in subDirectories)
{
_targetPathQueue.Enqueue(subDirectory);
}
}
else { } //do other stuff;
Interlocked.Decrement(ref _threadCounter);
}
}
我的问题是,是否 safe/worth 只是从已经 运行ning 的任务中初始化新任务以避免浪费处理器时间监视队列?看起来像这样的东西:
static class DriveHandler
{
internal static readonly List<string> fixedDrives = GetFixedDrives();
private static int _threadCounter = 0;
internal static void WipeDrives()
{
foreach (string driveLetter in fixedDrives)
{
Interlocked.Increment(ref _threadCounter);
Task.Run(() => WalkDrive(driveLetter));
}
while (Volatile.Read(ref _threadCounter) > 0)
{
Thread.Sleep(5000);
}
}
private static void WalkDrive(string directory)
{
foreach (string file in Directory.GetFiles(directory))
{
//If file meets conditions, delete
}
string[] subDirectories = Directory.GetDirectories(directory);
if (subDirectories.Length != 0)
{
foreach (string subDirectory in subDirectories)
{
Interlocked.Increment(ref _threadCounter);
Task.Run(() => WalkDrive(path));
}
}
else { } //do other stuff;
Interlocked.Decrement(ref _threadCounter);
}
}
我当然需要每个任务一旦完成就死掉,这样做会使旧任务 parents 变成新任务并让它们存活直到所有 children 都完成?
非常感谢!
第一个问题:
Task.Run(() => WalkDrive(path));
这是一种一劳永逸的时尚,在这种情况下做这件事不是一件好事,为什么?因为有可能,你在硬盘上拥有的文件和路径比机器拥有的 CPU 和内存容量要多(任务消耗的内存也不仅仅是 CPU)。发射后忘记,因此得名,您不断产生任务而没有 await
ing 它们。
My question is, is it safe/worth it to just initialize the new tasks from within the already running tasks to avoid wasting processor time monitoring the queue?
这是有效的,没有什么可以阻止你这样做,但你已经在浪费资源了,为什么每次都产生新任务?您已经有一个 运行,只需将其设为一个长 运行 后台任务并保留它 运行,只需两个线程(我假设一个是(UI/user 面向)线程) 和一个做这项工作。 所有这些锁和任务产生都会损害您的性能并浪费所有资源 CPU + 内存分配。
如果你想通过并行执行来加速你可以把路径加到并发队列中,并且只有10-100个并发任务MAX之类的,至少你有一个上限,你控制多少代码并行执行。
虽然conccurent-queue不为空且没有人请求取消操作:
- 从基本路径开始
- 获取所有 sub-paths 并将它们排入 concurrent-queue
- 处理该路径中的文件
- 使当前基本路径成为队列中的下一个可用项目
- 重新开始。
您只需启动最大并发任务数即可。
您的主要 loop/while 条件类似于:
private async Task StartAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
for (int i = 0; i < MaxConcurrentTasks; i++)
{
tasks.Add(Task.Run(() => ProcessPath(initialPathHere), cancellationToken));
}
await Task.WhenAll(tasks);
}
然后沿着这些线:
private static async Task ProcessPath(string path, CancellationToken cancellationToken)
{
while(concurrentDictionary.Count > 0 && !cancellationToken.IsCancellationRequested)
{
foreach(var subPath in System.IO.Directory.EnumerateDirectories(path))
{
//Enqueue the subPath into the concurrent dictionary
}
//Once finished, process files in the current path
foreach (var file in path)
{
}
path = concurrentDictionary.Dequeue();
}
}
还没有检查过语法,但我认为这是一个好的算法应该做的。此外,请记住,当任务完成其当前作业时,此行中的队列可能为空,因此请相应地修改该代码。
path = concurrentDictionary.Dequeue();
最后的笔记:
- 考虑任务与并行之间的权衡。Invok/execute
- 考虑使用
BackgroundServices
它们 fine-tuned 很长 运行,取决于您的代码和要求 - 为了提高性能,请记住黄金法则
measure early
。从测量开始,手头有一些指标,所以如果你想加快速度,你至少知道你现在能做多少,所以你重构并再次测量并比较,然后你就会知道你是否离你的目标越来越近或越来越远。 - 确保你 conccurency/paralell 处理得当,否则它只会对你不利。