多线程并发队列
ConcurrentQueue with multithreading
我不熟悉多线程概念。我需要将一定数量的字符串添加到队列中并用多个线程处理它们。使用线程安全的 ConcurrentQueue
。
这是我试过的。但是所有添加到并发队列的项目都没有被处理。仅处理前 4 项。
class Program
{
ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
static void Main(string[] args)
{
new Program().run();
}
void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];
for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Task.WaitAll(workers);
Console.WriteLine("Done.");
Console.ReadLine();
}
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
}
你的工人从queue
中取出一件然后完成工作,让他们一直工作到queue
是空的。
将worker函数中的if
替换为while
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
while (iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
您会 运行 您会看到,几乎所有项目都将由两名工人处理。原因:您的 cpu 有两个核心,都在工作并且没有 "free tiem slot" 来创建新任务。如果你想让你所有的 4 个任务来处理项目,你可以添加一个延迟,让你的处理器有时间创建另一个任务,比如:
while (iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
}
给你想要的输出:
...
Worker 0 is processing item Item8
Worker 1 is processing item Item9
Worker 2 is processing item Item10
Worker 3 is processing item Item11
Worker 3 is processing item Item13
Worker 1 is processing item Item12
...
您的实施存在一些问题。第一个也是显而易见的是 worker
方法仅使零个或一个项目出列然后停止:
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
应该是:
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
但这还不足以使您的程序正常工作。如果您的工作人员出队速度快于主线程入队速度,则它们将在主任务仍在入队时停止。您需要向工人发出他们可以停下来的信号。您可以定义一个布尔变量,一旦入队完成,该变量将设置为 true
:
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);
工作人员将检查值:
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
do {
string op;
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
}
while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
Console.WriteLine("Worker {0} is stopping.", workerId);
}
我实际上最近一直在使用 ConcurrentQueue
,我想我会分享这个。我创建了一个名为 CQItems
的自定义 ConcurrentQueue
,它具有使用给定参数构建自身的方法。在内部,当您告诉它构建 x 数量的 y 项目时,它会对项目构造函数进行 Parallel.For
调用。这里的好处是,当一个方法或函数调用 CQItems myCQ = CQItems.Generate(x, y)
时,该调用来自基础应用程序线程,这意味着在完成构建之前,没有任何东西可以查看队列。但在队列 class 内部,它是用线程构建的,并且比仅使用 List<>
或 Queue<>
快得多。大多数情况下,它是凭空生成东西,但有时(基于参数)从 SQL 创建项目——基本上是根据现有数据生成对象。无论如何,这些是 CQItems
class 中可以帮助解决此问题的两种方法:
public void Generate(int numberOfItems = 1, ItemOptions options = ItemOptions.NONE)
{
try
{
Type t = typeof(T);
if (t == typeof(Item))
throw new ArgumentException("Type " + t + " is not valid for generation. Please contact QA.");
else
Parallel.For(0, numberOfItems, (i, loopState) =>
{
try
{
GenerateItemByType(typeof(T), options);
}
catch
{
loopState.Stop();
throw;
}
});
}
catch (AggregateException ae)
{
ae.Handle((x) =>
{
if (x is SQLNullResultsException)
{
throw x;
}
else if (x is ImageNotTIFFException)
{
throw x;
}
else
{
throw x;
}
return true;
});
}
catch
{
throw;
}
finally
{
ItemManager.Instance.Clear();
}
}
private void GenerateItemByType(Type t, ItemOptions options = ItemOptions.NONE)
{
try
{
if (t == typeof(ItemA))
{
if ((options & ItemOptions.DUPLICATE) != 0)
{
this.Enqueue(new ItemB(options));
}
else
{
this.Enqueue(new ItemA(options));
}
}
else if (t == typeof(ItemC))
{
this.Enqueue(new ItemC(options));
}
}
catch
{
throw;
}
finally { }
}
一些有用的笔记:
在 Parallel.For()
中提供 loopState
变量允许我们在捕获到异常时将状态设置为 stop。这很好,因为如果要求您的循环执行 1000 次操作,并且第 5 次迭代抛出异常,它将继续循环。你可能想要它,但在我的例子中,异常需要退出线程循环。您最终仍会得到一个 AggregateException
(显然,这正是线程抛出异常时发生的情况)。解析出那些并且只发送第一个可以节省大量时间和头痛试图通过一个巨大的异常组除草其中后来的异常可能(或可能不)是由第一个引起的。
至于重新抛出,我尝试为大多数预期类型的异常添加一个 catch 语句,即使我打算将它们扔到堆栈中也是如此。其中一部分用于故障排除(能够中断特定异常可能很方便)。部分原因是有时我希望能够做其他事情,例如停止循环、更改或添加异常消息,或者在拆分 AggregateException
的情况下,只发送一个异常备份堆栈而不是整个聚合。只是对可能正在查看此内容的任何人进行澄清。
最后,为了防止混淆,Type(T)
值来自我的 CQItems
class 本身:
public class CQItems<T> : ConcurrentQueue<Item>, IDisposable
我不熟悉多线程概念。我需要将一定数量的字符串添加到队列中并用多个线程处理它们。使用线程安全的 ConcurrentQueue
。
这是我试过的。但是所有添加到并发队列的项目都没有被处理。仅处理前 4 项。
class Program
{
ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
static void Main(string[] args)
{
new Program().run();
}
void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];
for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Task.WaitAll(workers);
Console.WriteLine("Done.");
Console.ReadLine();
}
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
}
你的工人从queue
中取出一件然后完成工作,让他们一直工作到queue
是空的。
将worker函数中的if
替换为while
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
while (iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
您会 运行 您会看到,几乎所有项目都将由两名工人处理。原因:您的 cpu 有两个核心,都在工作并且没有 "free tiem slot" 来创建新任务。如果你想让你所有的 4 个任务来处理项目,你可以添加一个延迟,让你的处理器有时间创建另一个任务,比如:
while (iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
}
给你想要的输出:
...
Worker 0 is processing item Item8
Worker 1 is processing item Item9
Worker 2 is processing item Item10
Worker 3 is processing item Item11
Worker 3 is processing item Item13
Worker 1 is processing item Item12
...
您的实施存在一些问题。第一个也是显而易见的是 worker
方法仅使零个或一个项目出列然后停止:
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
应该是:
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
但这还不足以使您的程序正常工作。如果您的工作人员出队速度快于主线程入队速度,则它们将在主任务仍在入队时停止。您需要向工人发出他们可以停下来的信号。您可以定义一个布尔变量,一旦入队完成,该变量将设置为 true
:
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);
工作人员将检查值:
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
do {
string op;
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
}
while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
Console.WriteLine("Worker {0} is stopping.", workerId);
}
我实际上最近一直在使用 ConcurrentQueue
,我想我会分享这个。我创建了一个名为 CQItems
的自定义 ConcurrentQueue
,它具有使用给定参数构建自身的方法。在内部,当您告诉它构建 x 数量的 y 项目时,它会对项目构造函数进行 Parallel.For
调用。这里的好处是,当一个方法或函数调用 CQItems myCQ = CQItems.Generate(x, y)
时,该调用来自基础应用程序线程,这意味着在完成构建之前,没有任何东西可以查看队列。但在队列 class 内部,它是用线程构建的,并且比仅使用 List<>
或 Queue<>
快得多。大多数情况下,它是凭空生成东西,但有时(基于参数)从 SQL 创建项目——基本上是根据现有数据生成对象。无论如何,这些是 CQItems
class 中可以帮助解决此问题的两种方法:
public void Generate(int numberOfItems = 1, ItemOptions options = ItemOptions.NONE)
{
try
{
Type t = typeof(T);
if (t == typeof(Item))
throw new ArgumentException("Type " + t + " is not valid for generation. Please contact QA.");
else
Parallel.For(0, numberOfItems, (i, loopState) =>
{
try
{
GenerateItemByType(typeof(T), options);
}
catch
{
loopState.Stop();
throw;
}
});
}
catch (AggregateException ae)
{
ae.Handle((x) =>
{
if (x is SQLNullResultsException)
{
throw x;
}
else if (x is ImageNotTIFFException)
{
throw x;
}
else
{
throw x;
}
return true;
});
}
catch
{
throw;
}
finally
{
ItemManager.Instance.Clear();
}
}
private void GenerateItemByType(Type t, ItemOptions options = ItemOptions.NONE)
{
try
{
if (t == typeof(ItemA))
{
if ((options & ItemOptions.DUPLICATE) != 0)
{
this.Enqueue(new ItemB(options));
}
else
{
this.Enqueue(new ItemA(options));
}
}
else if (t == typeof(ItemC))
{
this.Enqueue(new ItemC(options));
}
}
catch
{
throw;
}
finally { }
}
一些有用的笔记:
在 Parallel.For()
中提供 loopState
变量允许我们在捕获到异常时将状态设置为 stop。这很好,因为如果要求您的循环执行 1000 次操作,并且第 5 次迭代抛出异常,它将继续循环。你可能想要它,但在我的例子中,异常需要退出线程循环。您最终仍会得到一个 AggregateException
(显然,这正是线程抛出异常时发生的情况)。解析出那些并且只发送第一个可以节省大量时间和头痛试图通过一个巨大的异常组除草其中后来的异常可能(或可能不)是由第一个引起的。
至于重新抛出,我尝试为大多数预期类型的异常添加一个 catch 语句,即使我打算将它们扔到堆栈中也是如此。其中一部分用于故障排除(能够中断特定异常可能很方便)。部分原因是有时我希望能够做其他事情,例如停止循环、更改或添加异常消息,或者在拆分 AggregateException
的情况下,只发送一个异常备份堆栈而不是整个聚合。只是对可能正在查看此内容的任何人进行澄清。
最后,为了防止混淆,Type(T)
值来自我的 CQItems
class 本身:
public class CQItems<T> : ConcurrentQueue<Item>, IDisposable