如何通过 ConcurrentDictionary 使用任务
How to use tasks with ConcurrentDictionary
我必须编写一个程序,从数据库中读取要处理的队列,所有队列都是 运行 并行的,并使用 ConcurrentDictionary 在父线程上进行管理。
我有一个代表队列的 class ,它有一个构造函数接收队列信息和父实例句柄。队列class也有处理队列的方法。
这是队列 Class:
Class MyQueue {
protected ServiceExecution _parent;
protect string _queueID;
public MyQueue(ServiceExecution parentThread, string queueID)
{
_parent = parentThread;
_queueID = queueID;
}
public void Process()
{
try
{
//Do work to process
}
catch()
{
//exception handling
}
finally{
_parent.ThreadFinish(_queueID);
}
父线程循环遍历队列数据集并实例化一个新队列class。它生成一个新线程以异步执行 Queue 对象的 Process 方法。这个线程添加到ConcurrentDictionary,然后启动如下:
private ConcurrentDictionary<string, MyQueue> _runningQueues = new ConcurrentDictionary<string, MyQueue>();
Foreach(datarow dr in QueueDataset.rows)
{
MyQueue queue = new MyQueue(this, dr["QueueID"].ToString());
Thread t = new Thread(()=>queue.Process());
if(_runningQueues.TryAdd(dr["QueueID"].ToString(), queue)
{
t.start();
}
}
//Method that gets called by the queue thread when it finishes
public void ThreadFinish(string queueID)
{
MyQueue queue;
_runningQueues.TryRemove(queueID, out queue);
}
我感觉这不是管理异步队列处理的正确方法,我想知道我是否可以 运行 使用此设计陷入死锁?此外,我想使用任务来异步 运行 队列而不是新线程。我需要跟踪队列,因为如果前一个 运行 尚未完成,我不会为同一队列生成新线程或任务。处理这种并行性的最佳方法是什么?
提前致谢!
关于您目前的做法
确实这不是正确的做法。从数据库读取的大量队列会产生大量线程,这可能是不好的。您每次都会创建一个新线程。最好创建一些线程然后重新使用它们。如果你想要任务,最好创建 LongRunning
任务并重新使用它们。
建议的设计
我建议采用以下设计:
- 只保留一个任务从数据库读取队列并将这些队列放入 BlockingCollection;
- 现在启动多个
LongRunning
任务以分别从 BlockingCollection 中读取一个队列并处理该队列;
- 当一个任务处理完它从 BlockingCollection 中取出的队列时,它会从那个 BlockingCollection 中取出另一个队列;
- 优化这些 处理 任务的数量,以便正确利用 CPU 的核心。通常由于 DB 交互很慢,您可以创建比内核数量多 3 倍的任务,但是 YMMV。
死锁可能性
它们至少不会发生在应用端。但是,由于队列是数据库事务,死锁可能发生在数据库端。如果数据库由于死锁而回滚,您可能必须编写一些逻辑以使您的任务再次启动事务。
示例代码
private static void TaskDesignedRun()
{
var expectedParallelQueues = 1024; //Optimize it. I've chosen it randomly
var parallelProcessingTaskCount = 4 * Environment.ProcessorCount; //Optimize this too.
var baseProcessorTaskArray = new Task[parallelProcessingTaskCount];
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var itemsToProcess = new BlockingCollection<MyQueue>(expectedParallelQueues);
//Start a new task to populate the "itemsToProcess"
taskFactory.StartNew(() =>
{
// Add code to read queues and add them to itemsToProcess
Console.WriteLine("Done reading all the queues...");
// Finally signal that you are done by saying..
itemsToProcess.CompleteAdding();
});
//Initializing the base tasks
for (var index = 0; index < baseProcessorTaskArray.Length; index++)
{
baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
{
while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0) {
MyQueue q;
if (!itemsToProcess.TryTake(out q)) continue;
//Process your queue
}
});
}
//Now just wait till all queues in your database have been read and processed.
Task.WaitAll(baseProcessorTaskArray);
}
我必须编写一个程序,从数据库中读取要处理的队列,所有队列都是 运行 并行的,并使用 ConcurrentDictionary 在父线程上进行管理。 我有一个代表队列的 class ,它有一个构造函数接收队列信息和父实例句柄。队列class也有处理队列的方法。
这是队列 Class:
Class MyQueue {
protected ServiceExecution _parent;
protect string _queueID;
public MyQueue(ServiceExecution parentThread, string queueID)
{
_parent = parentThread;
_queueID = queueID;
}
public void Process()
{
try
{
//Do work to process
}
catch()
{
//exception handling
}
finally{
_parent.ThreadFinish(_queueID);
}
父线程循环遍历队列数据集并实例化一个新队列class。它生成一个新线程以异步执行 Queue 对象的 Process 方法。这个线程添加到ConcurrentDictionary,然后启动如下:
private ConcurrentDictionary<string, MyQueue> _runningQueues = new ConcurrentDictionary<string, MyQueue>();
Foreach(datarow dr in QueueDataset.rows)
{
MyQueue queue = new MyQueue(this, dr["QueueID"].ToString());
Thread t = new Thread(()=>queue.Process());
if(_runningQueues.TryAdd(dr["QueueID"].ToString(), queue)
{
t.start();
}
}
//Method that gets called by the queue thread when it finishes
public void ThreadFinish(string queueID)
{
MyQueue queue;
_runningQueues.TryRemove(queueID, out queue);
}
我感觉这不是管理异步队列处理的正确方法,我想知道我是否可以 运行 使用此设计陷入死锁?此外,我想使用任务来异步 运行 队列而不是新线程。我需要跟踪队列,因为如果前一个 运行 尚未完成,我不会为同一队列生成新线程或任务。处理这种并行性的最佳方法是什么?
提前致谢!
关于您目前的做法
确实这不是正确的做法。从数据库读取的大量队列会产生大量线程,这可能是不好的。您每次都会创建一个新线程。最好创建一些线程然后重新使用它们。如果你想要任务,最好创建 LongRunning
任务并重新使用它们。
建议的设计
我建议采用以下设计:
- 只保留一个任务从数据库读取队列并将这些队列放入 BlockingCollection;
- 现在启动多个
LongRunning
任务以分别从 BlockingCollection 中读取一个队列并处理该队列; - 当一个任务处理完它从 BlockingCollection 中取出的队列时,它会从那个 BlockingCollection 中取出另一个队列;
- 优化这些 处理 任务的数量,以便正确利用 CPU 的核心。通常由于 DB 交互很慢,您可以创建比内核数量多 3 倍的任务,但是 YMMV。
死锁可能性
它们至少不会发生在应用端。但是,由于队列是数据库事务,死锁可能发生在数据库端。如果数据库由于死锁而回滚,您可能必须编写一些逻辑以使您的任务再次启动事务。
示例代码
private static void TaskDesignedRun()
{
var expectedParallelQueues = 1024; //Optimize it. I've chosen it randomly
var parallelProcessingTaskCount = 4 * Environment.ProcessorCount; //Optimize this too.
var baseProcessorTaskArray = new Task[parallelProcessingTaskCount];
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var itemsToProcess = new BlockingCollection<MyQueue>(expectedParallelQueues);
//Start a new task to populate the "itemsToProcess"
taskFactory.StartNew(() =>
{
// Add code to read queues and add them to itemsToProcess
Console.WriteLine("Done reading all the queues...");
// Finally signal that you are done by saying..
itemsToProcess.CompleteAdding();
});
//Initializing the base tasks
for (var index = 0; index < baseProcessorTaskArray.Length; index++)
{
baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
{
while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0) {
MyQueue q;
if (!itemsToProcess.TryTake(out q)) continue;
//Process your queue
}
});
}
//Now just wait till all queues in your database have been read and processed.
Task.WaitAll(baseProcessorTaskArray);
}