获取多线程应用程序生成的当前活动任务数
Get count of current active Tasks spawned by a multithreaded application
您好,我是一名几乎没有 C# 经验的实习生,我遇到了接管 windows 服务的情况,该服务使用 TaskCompletionSource
和 BlockingCollection
来实现多线程。我从来没有做过C#。我正在尝试优化该服务处理其处理日志文件任务的方式。
我的问题是,使用 BlockingCollection
创建一个执行 WorkiItem
的线程队列,如何获取队列中活动线程的数量?意思是 EnqueueTask()
命令调用的项目中有多少仍处于 运行 状态?我不想要 _taskQ.Count
returns 队列积压的计数。我想要活动线程数。我想将线程数保持在四个,并且只在前一个项目完成后才将一个项目排入队列。我不想在我的队列中出现一堆项目。
public class ProducerConsumerQueue
{
public CancellationTokenSource Token { get; set; }
private BlockingCollection<WorkItem> _taskQ;
public ProducerConsumerQueue(int workerCount)
{
_taskQ = new BlockingCollection<WorkItem>();
for(int i = 0; i <workerCount; i++)
{
Task.Factory.StartNew(Consume);
}
}
public Task EnqueueTask(Action action, CancellationToken? cancelToken)
{
var tcs = new TaskCompletionSource<object>();
_taskQ.Add(new WorkItem(tcs, action, cancelToken));
return tcs.Task;
}
public void Consume()
{
foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
{
if (workItem.CancelToken.HasValue &&
workItem.CancelToken.Value.IsCancellationRequested)
{
workItem.TaskSource.SetCanceled();
}
else
{
try
{
workItem.Action();
workItem.TaskSource.SetResult(null);
}
catch (OperationCanceledException ex)
{
if (ex.CancellationToken == workItem.CancelToken)
{
workItem.TaskSource.SetCanceled();
}
else
{
workItem.TaskSource.SetException(ex);
}
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
}
}
}
此 ProducerCOnsumer
队列在服务启动时调用,其队列在每个服务轮询间隔重新加载。我想通过将其设置为线程安全数据库 table 中的文件限制来限制生成这些线程的数量。因此,如果线程数为 4,则 db table 中的文件数将为 4。在完成 1 个文件之前,队列不应产生额外的线程或将文件排入队列。为此,我认为一个简单的解决方案是计算活动线程的数量(这意味着活跃的文件处理量并且在线程下降 1 之前不添加任何新文件:
protected override void OnStart(string[] args)
{
ProducerConsumerQueue = new ProducerConsumerQueue(Constants.THREAD_COUNT);
InitializeLogging();
PollOnServiceStart();
_timer.Elapsed += OnElapsedTime;
_timer.Enabled = true;
_timer.Interval = _interval;
}
public void OnElapsedTime(object source, ElapsedEventArgs args)
{
try
{
//InitializeLogging();
Poll();
}
catch (Exception ex)
{
Logger.Error(ex.Message.ToString());
}
}
public void PollOnServiceStart()
{
foreach (var handler in handlers)
{
ProducerConsumerQueue.EnqueueTask(handler.Execute, CancellationTokenSource.Token);
}
}
如果您想要一个线程安全的计数器来检查活动任务的数量,您可以使用 int _counter
字段和 Interlocked.Increment(ref _counter)
/Interlocked.Decrement(ref _counter)
.
请记住在 try
块中的第一行递增,在 finally
块中递减,这样如果出现异常,您就不会丢失对任何一个的调用。
https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked.increment?view=net-5.0
https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked.decrement?view=net-5.0
您好,我是一名几乎没有 C# 经验的实习生,我遇到了接管 windows 服务的情况,该服务使用 TaskCompletionSource
和 BlockingCollection
来实现多线程。我从来没有做过C#。我正在尝试优化该服务处理其处理日志文件任务的方式。
我的问题是,使用 BlockingCollection
创建一个执行 WorkiItem
的线程队列,如何获取队列中活动线程的数量?意思是 EnqueueTask()
命令调用的项目中有多少仍处于 运行 状态?我不想要 _taskQ.Count
returns 队列积压的计数。我想要活动线程数。我想将线程数保持在四个,并且只在前一个项目完成后才将一个项目排入队列。我不想在我的队列中出现一堆项目。
public class ProducerConsumerQueue
{
public CancellationTokenSource Token { get; set; }
private BlockingCollection<WorkItem> _taskQ;
public ProducerConsumerQueue(int workerCount)
{
_taskQ = new BlockingCollection<WorkItem>();
for(int i = 0; i <workerCount; i++)
{
Task.Factory.StartNew(Consume);
}
}
public Task EnqueueTask(Action action, CancellationToken? cancelToken)
{
var tcs = new TaskCompletionSource<object>();
_taskQ.Add(new WorkItem(tcs, action, cancelToken));
return tcs.Task;
}
public void Consume()
{
foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
{
if (workItem.CancelToken.HasValue &&
workItem.CancelToken.Value.IsCancellationRequested)
{
workItem.TaskSource.SetCanceled();
}
else
{
try
{
workItem.Action();
workItem.TaskSource.SetResult(null);
}
catch (OperationCanceledException ex)
{
if (ex.CancellationToken == workItem.CancelToken)
{
workItem.TaskSource.SetCanceled();
}
else
{
workItem.TaskSource.SetException(ex);
}
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
}
}
}
此 ProducerCOnsumer
队列在服务启动时调用,其队列在每个服务轮询间隔重新加载。我想通过将其设置为线程安全数据库 table 中的文件限制来限制生成这些线程的数量。因此,如果线程数为 4,则 db table 中的文件数将为 4。在完成 1 个文件之前,队列不应产生额外的线程或将文件排入队列。为此,我认为一个简单的解决方案是计算活动线程的数量(这意味着活跃的文件处理量并且在线程下降 1 之前不添加任何新文件:
protected override void OnStart(string[] args)
{
ProducerConsumerQueue = new ProducerConsumerQueue(Constants.THREAD_COUNT);
InitializeLogging();
PollOnServiceStart();
_timer.Elapsed += OnElapsedTime;
_timer.Enabled = true;
_timer.Interval = _interval;
}
public void OnElapsedTime(object source, ElapsedEventArgs args)
{
try
{
//InitializeLogging();
Poll();
}
catch (Exception ex)
{
Logger.Error(ex.Message.ToString());
}
}
public void PollOnServiceStart()
{
foreach (var handler in handlers)
{
ProducerConsumerQueue.EnqueueTask(handler.Execute, CancellationTokenSource.Token);
}
}
如果您想要一个线程安全的计数器来检查活动任务的数量,您可以使用 int _counter
字段和 Interlocked.Increment(ref _counter)
/Interlocked.Decrement(ref _counter)
.
请记住在 try
块中的第一行递增,在 finally
块中递减,这样如果出现异常,您就不会丢失对任何一个的调用。
https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked.increment?view=net-5.0
https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked.decrement?view=net-5.0