在无限循环中限制线程数的并发操作
concurrect oprating with limit on thread count in a infinity loop
我编写了一个无限循环,用于从队列 (RabbitMQ) 中拉取并在并发线程中处理每个拉取的项目,并发线程数量有限 运行 线程。
现在我想要一个限制线程执行的解决方案 count.see 我的循环示例:
public class ThreadWorker<T>
{
public List<T> _lst;
private int _threadCount;
private int _maxThreadCount;
public ThreadWorker(List<T> lst, int maxThreadCount)
{
_lst = lst;
_maxThreadCount = maxThreadCount;
}
public void Start()
{
var i = 0;
while (i < _lst.Count)
{
i++;
var pull = _lst[i];
Process(pull);
}
}
public void Process(T item)
{
if (_threadCount > _maxThreadCount)
{
//wait any opration be done
// How to wait for one thread?
Interlocked.Decrement(ref _threadCount);
}
var t = new Thread(() => Opration(item));
t.Start();
Interlocked.Increment(ref _threadCount);
}
public void Opration(T item)
{
Console.WriteLine(item.ToString());
}
}
请注意,当我使用信号量进行限制时,Start() 方法不会等待所有 运行 线程。我的循环应该在 运行 个具有 _maxThreadCount 的线程之后,等待直到释放一个线程然后推送新线程以进行并发处理。
我会用Semaphore这种方式来控制线程数:
public class ThreadWorker<T>
{
SemaphoreSlim _sem = null;
List<T> _lst;
public ThreadWorker(List<T> lst, int maxThreadCount)
{
_lst = lst;
_sem = new SemaphoreSlim(maxThreadCount);
}
public void Start()
{
var i = 0;
while (i < _lst.Count)
{
i++;
var pull = _lst[i];
_sem.Wait(); /*****/
Process(pull);
}
}
public void Process(T item)
{
var t = new Thread(() => Opration(item));
t.Start();
}
public void Opration(T item)
{
Console.WriteLine(item.ToString());
_sem.Release(); /*****/
}
}
我编写了一个无限循环,用于从队列 (RabbitMQ) 中拉取并在并发线程中处理每个拉取的项目,并发线程数量有限 运行 线程。 现在我想要一个限制线程执行的解决方案 count.see 我的循环示例:
public class ThreadWorker<T>
{
public List<T> _lst;
private int _threadCount;
private int _maxThreadCount;
public ThreadWorker(List<T> lst, int maxThreadCount)
{
_lst = lst;
_maxThreadCount = maxThreadCount;
}
public void Start()
{
var i = 0;
while (i < _lst.Count)
{
i++;
var pull = _lst[i];
Process(pull);
}
}
public void Process(T item)
{
if (_threadCount > _maxThreadCount)
{
//wait any opration be done
// How to wait for one thread?
Interlocked.Decrement(ref _threadCount);
}
var t = new Thread(() => Opration(item));
t.Start();
Interlocked.Increment(ref _threadCount);
}
public void Opration(T item)
{
Console.WriteLine(item.ToString());
}
}
请注意,当我使用信号量进行限制时,Start() 方法不会等待所有 运行 线程。我的循环应该在 运行 个具有 _maxThreadCount 的线程之后,等待直到释放一个线程然后推送新线程以进行并发处理。
我会用Semaphore这种方式来控制线程数:
public class ThreadWorker<T>
{
SemaphoreSlim _sem = null;
List<T> _lst;
public ThreadWorker(List<T> lst, int maxThreadCount)
{
_lst = lst;
_sem = new SemaphoreSlim(maxThreadCount);
}
public void Start()
{
var i = 0;
while (i < _lst.Count)
{
i++;
var pull = _lst[i];
_sem.Wait(); /*****/
Process(pull);
}
}
public void Process(T item)
{
var t = new Thread(() => Opration(item));
t.Start();
}
public void Opration(T item)
{
Console.WriteLine(item.ToString());
_sem.Release(); /*****/
}
}