限制 运行 线程的安全方法 C#

Safe Way to Limit Running Threads C#

我正在编写一个程序,它将 运行 执行各种任务。我已经设置了我称之为 "Task Queue" 的东西,我将在其中不断获取要处理的下一个任务(如果有的话)并启动一个新线程来处理该任务。但是,出于明显的原因,我想限制一次可以产生的线程数量。我创建了一个变量来跟上要生成的最大线程数,一个变量用于当前线程数。我正在考虑使用锁来尝试准确地跟上当前的线程数。这是我的总体思路。

public class Program {
  private static int mintThreadCount;
  private static int mintMaxThreadCount = 10;
  private static object mobjLock = new object();
  static void Main(string[] args) {
     mintThreadCount = 0;
     int i = 100;
     while(i > 0) {
        StartNewThread();
        i--;
     }
     Console.Read();
  }

  private static void StartNewThread() {
     lock(mobjLock) {
        if(mintThreadCount < mintMaxThreadCount) {
           Thread newThread = new Thread(StartTask);
           newThread.Start(mintThreadCount);
           mintThreadCount++;
        }
        else {
           Console.WriteLine("Max Thread Count Reached.");
        }
     }
  }

  private static void StartTask(object iCurrentThreadCount) {
     int id = new Random().Next(0, 1000000);
     Console.WriteLine("New Thread with id of: " + id.ToString() + " Started. Current Thread count: " + ((int)iCurrentThreadCount).ToString());
     Thread.Sleep(new Random().Next(0, 3000));
     lock(mobjLock) {
        Console.WriteLine("Ending thread with id of: " + id.ToString() + " now.");
        mintThreadCount--;
        Console.WriteLine("Thread space release by id of: " + id.ToString() + " . Thread count now at: " + mintThreadCount);
     }
  }
}

由于我在两个地方锁定以访问同一个变量(启动新线程时递增,结束时递减)是否有可能等待锁递减的线程挂起并且永远不会结束?从而达到最大线程数并且永远无法启动另一个?对我的方法还有其他建议吗?

先问最简单的问题……:)

…is there a chance that the thread waiting on the lock to decrement could get hung up and never end?

不,不在您发布的代码中。 None 的代码在等待计数更改或类似的事情时持有锁。您只需获取锁,然后修改计数或发出消息,然后立即释放锁。所以没有线程会无限期地持有锁,也没有嵌套锁(如果操作不当会导致死锁)。

现在,就是说:根据您发布的代码和您的问题,还不完全清楚这里的 intent 是什么。编写的代码确实会限制创建的线程数。但是一旦达到该限制(并且会很快达到),主循环就会旋转,报告 "Max Thread Count Reached.".

确实,在总循环次数为 100 的情况下,我认为整个循环有可能在第一个线程到达 运行 之前完成,具体取决于其他什么正在占用 CPU系统上的核心。如果某些线程确实达到 运行 并且碰巧其中一些线程的休眠持续时间非常短,那么您以后可能会偷偷加入更多线程。但是循环的大多数迭代将看到最大线程数,报告已达到限制并继续循环的下一次迭代。

您在评论中写下 "the main thread should never be blocked"(如果您认为相关,您真的应该在 问题 本身中添加一些内容)。当然,问题来了,主线程在没有阻塞的情况下在做什么?主线程如何知道是否以及何时尝试安排新线程?

如果您想要一个真正有用的答案,这些都是重要的细节。

请注意,已向您提供了使用信号量的建议(具体来说,SemaphoreSlim)。这 可能 是个好主意,但请注意 class 通常用于协调所有竞争同一资源的多个线程。为了使它有用,您实际上拥有 多于 的线程,信号量确保在给定时间只有 10 个线程到达 运行。

在你的情况下,在我看来你实际上是在问如何避免首先创建额外的线程。 IE。您希望主循环检查计数,如果达到最大计数则根本不创建线程。在这种情况下,一种可能的解决方案可能是直接使用 Monitor class:

private static void StartNewThread() {
    lock(mobjLock) {
        while (mintThreadCount >= mintMaxThreadCount) {
           Console.WriteLine("Max Thread Count Reached.");
           Monitor.Wait(mobjLock);
        }

        Thread newThread = new Thread(StartTask);
        newThread.Start(mintThreadCount);
        mintThreadCount++;
        }
    }
}

以上将导致 StartNewThread() 方法等待计数低于最大值,然后将始终创建一个新线程。

当然,每个线程都需要发出它更新计数的信号,以便上面的循环可以从等待中释放并检查计数:

private readonly Random _rnd = new Random();

private static void StartTask(object iCurrentThreadCount) {
    int id = _rnd.Next(0, 1000000);
    Console.WriteLine("New Thread with id of: " + id.ToString() + " Started. Current Thread count: " + ((int)iCurrentThreadCount).ToString());
    Thread.Sleep(_rnd.Next(0, 3000));
    lock(mobjLock) {
        Console.WriteLine("Ending thread with id of: " + id.ToString() + " now.");
        mintThreadCount--;
        Console.WriteLine("Thread space release by id of: " + id.ToString() + " . Thread count now at: " + mintThreadCount);
        Monitor.Pulse(mobjLock);
    }
}

上面的问题是它阻塞主循环。如果我没理解错的话,你不会想要的。

(注意: 你的代码中有一个常见但严重的错误,因为你每次需要一个随机数时都会创建一个新的 Random 对象. 要正确使用 Random class,您必须只创建一个实例并在需要新随机数时重复使用它。我已经调整了上面的代码示例以解决该问题)。

上述问题和您的原始版本的另一个问题是每个新任务都分配了一个全新的线程。线程的创建甚至存在都非常昂贵,这就是线程池存在的原因。根据您的实际情况,您可能应该只使用例如ParallelParallelEnumerableTask 来管理您的任务。

但如果您真的想显式地完成这一切,一种选择是简单地启动十个线程,并让它们从 BlockingCollection<T> 检索数据以进行操作。由于您恰好启动了 10 个线程,因此您知道您的线程永远不会超过 运行ning。当有足够的工作让所有十个线程都忙时,它们就会忙。否则,队列将为空,部分或全部将等待新数据显示在队列中。闲置,但未使用任何 CPU 资源。

例如:

private BlockingCollection<int> _queue = new BlockingCollection<int>();

private static void StartThreads() {
    for (int i = 0; i < mintMaxThreadCount; i++) {
        new Thread(StartTask).Start();
    }
}

private static void StartTask() {
    // NOTE: a random number can't be a reliable "identification", as two or
    // more threads could theoretically get the same "id".
    int id = new Random().Next(0, 1000000);
    Console.WriteLine("New Thread with id of: " + id.ToString() + " Started.");
    foreach (int i in _queue) {
        Thread.Sleep(i);
    }
    Thread.Sleep(new Random().Next(0, 3000));
}

您只需在某个地方调用一次 StartThreads(),而不是多次调用您的其他 StartNewThread() 方法。大概是在你提到的 while (true) 循环之前。

然后由于需要处理一些任务,你只需向队列中添加数据,例如:

_queue.Add(_rnd.Next(0, 3000));

当您希望线程全部退出时(例如,在您的主循环退出后,无论如何发生):

_queue.CompleteAdding();

这将导致正在进行的每个 foreach 循环结束,让每个线程退出。

当然,BlockingCollection<T>T 类型参数可以是任何东西。据推测,在您的情况下,它实际上代表 "task"。我使用 int,只是因为在你的示例中那实际上是你的 "task"(即线程应该休眠的毫秒数)。

然后你的主线程就可以做它通常做的任何事情,调用 Add() 方法根据需要将新工作分派给你的消费者线程。

同样,如果没有更多细节,我无法真正评论这种方法是否比使用内置任务运行ning 机制更好在.NET 中。但鉴于您目前所解释的内容,它应该运作良好。