ConcurrentQueue .Net:多线程消费者

ConcurrentQueue .Net: Multithreaded consumer

我有一个非常基本的问题,更多的是围绕 ConcurrentQueue 的概念。队列是 FIFO。当多个线程开始访问它时,我们如何保证FIFO? 假设我按顺序添加了 AppleOrangesLemonPeachApricot。第一个TryTake应该returnApple。但是当多个线程开始发出它们自己的 TryTake 请求时会发生什么?有没有可能一个线程可以 return Lemon 甚至在另一个线程可以 return Apple 之前?我假设其他项目也将 returned 直到队列为空。但是这些 return 会围绕 FIFO 的基本原则进行管理吗?

ConcurrentQueue 本身的行为总是先进先出。

当我们谈论来自 ConcurrentQueue 的线程 "returning" 项目时,我们谈论的是一个操作,该操作涉及将项目 执行一些操作一种使您能够观察已出列的操作的操作。无论是打印输出还是将该项目添加到另一个列表,在检查之前您实际上并不知道哪个项目已从队列中取出。

虽然队列本身是 FIFO,但您无法预测其他事件(例如检查出列项目)的发生顺序。这些项目将按 FIFO 出列,但您可能无法观察到按该顺序从队列中出来的内容。不同线程执行检查或输出的顺序可能与它们从队列中删除项目的顺序不完全相同。

换句话说,它会发生 FIFO,但它可能看起来总是这样,也可能不总是这样。如果处理项目的确切顺序很重要,您不会希望同时读取 ConcurrentQueue

如果您要对此进行测试(我正要写点什么),那么您可能会发现大部分时间项目都以精确的 FIFO 顺序进行处理,但偶尔它们不会.


这是一个控制台应用程序。即将

  • ConcurrentQueue 中插入 1-5000 的数字,单线程。
  • 执行并发操作以使每个项目出列并将它们移动到另一个 ConcurrentQueue这是"multithreaded consumer."
  • 读取第二个队列中的项目(同样是单线程)并报告任何乱序的数字。

很多次我 运行 它并没有出现乱序。但大约 50% 的时间它只报告一些乱序的数字。因此,如果您指望所有数字都按其原始顺序进行处理,那么大多数时候几乎所有数字都会发生这种情况。但后来就不行了。如果您不关心确切的顺序,那很好,但如果您这样做,就会出现错误且不可预测。

结论 - 不要依赖于多线程操作的确切顺序。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;

namespace ConcurrentQueueExperiment
{
    class Program
    {
        static void Main(string[] args)
        {
            var inputQueue = new ConcurrentQueue<int>();
            var outputQueue = new ConcurrentQueue<int>();
            Enumerable.Range(1,5000).ToList().ForEach(inputQueue.Enqueue);
            while (inputQueue.Any())
            {
                Task.Factory.StartNew(() =>
                {
                    int dequeued;
                    if (inputQueue.TryDequeue(out dequeued))
                    {
                        outputQueue.Enqueue(dequeued);
                    }
                });
            }
            int output = 0;
            var previous = 0;
            while (outputQueue.TryDequeue(out output))
            {
                if(output!=previous+1)
                    Console.WriteLine("Out of sequence: {0}, {1}", previous, output);
                previous = output;
            }
            Console.WriteLine("Done!");
            Console.ReadLine();
        }
    }
}