与多个消费者一起使用 BlockingCollection

Consuming BlockingCollection with multiple consumers

我有一个程序如下

 class Program
    {
        public static int TaskCount { get; set; }
        public static BlockingCollection<string> queue = new BlockingCollection<string>(new ConcurrentQueue<string>());
        static void Main(string[] args)
        {
            TaskCount = 3;
            Task.Factory.StartNew(() => Producer());

            for (int i = 0; i < TaskCount; i++)
                Task.Factory.StartNew(() => Consumer());
            Console.ReadKey();
        }

        private static void Producer()
        {
            using (StreamWriter sw = File.AppendText(@"C:\pcadder.txt"))
            {
                for (int i = 0; i < 15; i++)
                {
                    queue.Add("Item: " + (i+1).ToString());
                    var message = string.Format("{2}.Item added: Item {0} at {1}", (i+1).ToString(), DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),i+1);
                    Console.WriteLine(message);
                    sw.WriteLine(message);

                }
                queue.CompleteAdding();
            }
        }
        private static void Consumer()
        {
            int count = 1;
            foreach (var item in queue.GetConsumingEnumerable())
            {
                var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
                        Thread.CurrentThread.ManagedThreadId,count);
                Console.WriteLine(message);

                using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
                    sw.WriteLine(message);
                count += 1;
            }
        }
    }

输出

1.Item added: Item 1 at 2017.07.06 09:58:49.784734
2.Item added: Item 2 at 2017.07.06 09:58:49.784734
3.Item added: Item 3 at 2017.07.06 09:58:49.784734
4.Item added: Item 4 at 2017.07.06 09:58:49.784734
5.Item added: Item 5 at 2017.07.06 09:58:49.784734
6.Item added: Item 6 at 2017.07.06 09:58:49.784734
7.Item added: Item 7 at 2017.07.06 09:58:49.784734
8.Item added: Item 8 at 2017.07.06 09:58:49.784734
9.Item added: Item 9 at 2017.07.06 09:58:49.784734
10.Item added: Item 10 at 2017.07.06 09:58:49.784734
11.Item added: Item 11 at 2017.07.06 09:58:49.784734
12.Item added: Item 12 at 2017.07.06 09:58:49.784734
13.Item added: Item 13 at 2017.07.06 09:58:49.784734
14.Item added: Item 14 at 2017.07.06 09:58:49.784734
15.Item added: Item 15 at 2017.07.06 09:58:49.784734

1.Item taken: Item: 3 at 2017.07.06 09:58:49.784734 by thread 7.
1.Item taken: Item: 2 at 2017.07.06 09:58:49.784734 by thread 4.
1.Item taken: Item: 1 at 2017.07.06 09:58:49.784734 by thread 5.
2.Item taken: Item: 5 at 2017.07.06 09:58:49.784734 by thread 4.
2.Item taken: Item: 4 at 2017.07.06 09:58:49.784734 by thread 7.
2.Item taken: Item: 6 at 2017.07.06 09:58:49.784734 by thread 5.
3.Item taken: Item: 7 at 2017.07.06 09:58:49.784734 by thread 4.
3.Item taken: Item: 8 at 2017.07.06 09:58:49.784734 by thread 7.
3.Item taken: Item: 9 at 2017.07.06 09:58:49.784734 by thread 5.
4.Item taken: Item: 11 at 2017.07.06 09:58:49.784734 by thread 7.
4.Item taken: Item: 12 at 2017.07.06 09:58:49.784734 by thread 5.
5.Item taken: Item: 13 at 2017.07.06 09:58:49.784734 by thread 7.
5.Item taken: Item: 14 at 2017.07.06 09:58:49.784734 by thread 5.
6.Item taken: Item: 15 at 2017.07.06 09:58:49.784734 by thread 7.

在几乎每个 运行 程序之后,我的消费者日志中都缺少一项。(这里缺少 Item 10)。我不明白为什么会这样。

  1. 这件商品怎么没有处理?
  2. 当使用多个任务作为消费者时,按顺序处理项目 (FIFO) 是否被破坏了?如果我想 keep/force 在消费者方法中以 FIFO 顺序处理,我应该避免使用多个任务吗? (处理可能包括I/O,网络操作)

这里

using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
    sw.WriteLine(message);

您从多个线程快速写入同一个文件。这不是一个好主意,这段代码实际上会抛出一个异常。它在您的代码中不会被注意到,因为您不处理任何异常并且它发生在后台线程中,因此不会使您的应用程序崩溃。这回答了为什么您的日志中缺少项目。您可以写入同一个文件,例如:

// create it outside `Consumer` and make synchronized
using (var taker = TextWriter.Synchronized(File.AppendText(@"pctaker.txt"))) {
    TaskCount = 3;
    Task.Factory.StartNew(() => Producer());
    //Producer();
    for (int i = 0; i < TaskCount; i++)
        // pass to consumer
        Task.Factory.StartNew(() => Consumer(taker));
    Console.ReadKey();
}

private static void Consumer(TextWriter writer)
{
    int count = 1;
    foreach (var item in queue.GetConsumingEnumerable())
    {
        var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
                Thread.CurrentThread.ManagedThreadId, count);
        Console.WriteLine(message);                                
        writer.WriteLine(message);
        writer.Flush();
        count += 1;
    }
}

或者只是在写入文件的地方加上 lock

至于第二个问题 - 消费者仍然以 FIFO 顺序拉取物品,但由于你有多个消费者 - 处理顺序当然不能保证,因为所有消费者并行处理物品。消费者A拉取物品1,消费者B同时拉取物品2。消费者 A 需要 100 毫秒来处理项目 1,消费者 B 需要 10 毫秒来处理项目 2。结果 - 在项目 1 之前处理项目 2(即 - 写入您的日志)。

如果你想确保并行处理块的输出与其输入的顺序相同,如果你想使用 BlockingCollection,你将不得不乱搞诸如优先队列之类的东西。

但是,如果您准备使用更现代的 DataFlow library(任务并行库的一部分),则有一种更简单的方法。

这是一个示例程序。请注意,这使用 await 但这不是使用 DataFlow 所必需的。队列中的项目类型是 int,但您可以指定任何类型 - int 恰好是一种简单的演示方式。

关于这个示例程序需要注意的重要一点是输入项由多个线程并行处理,但最终输出仍然与输入时的顺序相同。

如果您查看输出 "Returning X from thread Y",您会发现 X 并不总是与它排队的顺序相同。工作线程可以return非输入顺序的数据。

但是,如果您查看输出 "Outputting X",您会发现 X 的顺序与其排队的顺序相同(单调递增)。

输出队列已确保输出顺序正确。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp1
{
    public class Program
    {
        static void Main()
        {
            var inQueue  = new TransformBlock<int, int>(item => process(item), processBlockOptions());
            var outQueue = new ActionBlock<int>(item => output(item), outputBlockOptions());

            inQueue.LinkTo(outQueue, new DataflowLinkOptions {PropagateCompletion = true});

            var task = queueData(inQueue);

            Console.WriteLine("Waiting for task to complete in thread " + Thread.CurrentThread.ManagedThreadId);
            task.Wait();
            Console.WriteLine("Completed.");
        }

        static async Task queueData(TransformBlock<int, int> executor)
        {
            await enqueue(executor);

            Console.WriteLine("Indicating that no more data will be queued.");
            executor.Complete(); // Indicate that no more items will be queued.

            Console.WriteLine("Waiting for queue to empty.");
            await executor.Completion; // Wait for executor queue to empty.
        }

        static async Task enqueue(TransformBlock<int, int> executor)
        {
            for (int i = 0; i < 100; ++i)
            {
                Console.WriteLine("Queuing data " + i);
                int v = i;
                await executor.SendAsync(v); // Queues a method that returns v.
            }
        }

        static int process(int value)  // Procss value by adding 1000 to it.
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing item {value}");
            value += 1000;
            Thread.Sleep(150+nextRand());  // Simulate work.
            Console.WriteLine($"Returning {value} from thread {Thread.CurrentThread.ManagedThreadId}");

            return value;
        }

        static void output(int value)
        {
            Console.WriteLine($"Outputting {value}");
        }

        static ExecutionDataflowBlockOptions processBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                BoundedCapacity = 8
            };
        }

        static ExecutionDataflowBlockOptions outputBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        static int nextRand()
        {
            lock (rngLock)
            {
                return rng.Next(250);
            }
        }

        static Random rng = new Random();
        static object rngLock = new object();
    }
}

玩转 MaxDegreeOfParallelismBoundedCapacity return 的值很有趣 processBlockOptions()

例如,尝试使用 MaxDegreeOfParallelism 8BoundedCapacity = 16


[编辑] 回答你关于 "How does not this item processed?" 的问题 - 我怀疑这是因为你的输出日志记录不是线程安全的(根据 Evk 的好回答)