如何同时从 BlockingCollection 中取出项目?

How to concurrently take items from the BlockingCollection?

我有 BlockingCollection 个整数。 下面是我正在尝试开发的代码,用于同时从 BlockingCollection.

中删除项目
static void Produce()
    {
        for (int i = 0; i < 100000; i++)
        {
            bc3.Add(i);
        }
        Run();
    }

    static void Run()
    {
        for (int i = 0; i < 5; i++)
        {
            Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
        }
    }

    static void Process()
    {
        var stopWatch = new Stopwatch();
        stopWatch.Start();
        while (bc3.Count!= 0)
        {
            bc3.Take();
        }
        stopWatch.Stop();
        Console.WriteLine("Thread {0}",
        Thread.CurrentThread.ManagedThreadId);
        Console.WriteLine("Elapsed Time {0}", stopWatch.Elapsed.Seconds);

    }

这是通过创建 5 个任务来更快地删除项目的正确方法吗?

秒表问题

你的测量结果是错误的,因为你使用了

stopWatch.Elapsed.Seconds

而不是

stopWatch.ElapsedMilliseconds

你只显示秒,忽略分钟、小时等

并发问题

不,这不是从 BlockingCollection 中删除项目的正确方法。不起作用的语句是

bc3.Count != 0

所有 5 个任务可能会同时检查此条件,发现还剩 1 个项目。他们都 5 去

bc3.Take();

一个任务可以移除项目,其他 4 个任务正在等待。

解决这个问题的一种方法是添加

bc3.CompleteAdding();

Produce().

垃圾收集问题

第一个任务完成后,Run() 方法完成,方法中的所有任务都超出范围并被垃圾收集。这可能会让您只看到 1 条而不是 5 条完成消息。

要解决此问题,请使用

    static void Run()
    {
        var tasks = new List<Task>();
        for (int i = 0; i < 5; i++)
        {
            tasks.Add(Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning));
        }
        try
        {
            Task.WaitAll(tasks.ToArray());
        }
        catch (AggregateException)
        { }        
    }

同步成本

这是我的输出之一,包含 5 个任务(和 100000000 个项目):

Thread 11
Thread 13
Thread 12
Thread 14
Thread 15
Elapsed Time 12878
Elapsed Time 13122
Elapsed Time 13128
Elapsed Time 13128
Elapsed Time 13128
Run: 13140

将此与单个任务进行比较:

Thread 12
Elapsed Time 10147
Run: 10149

这是因为只有一个 Take() 方法可以删除一个项目,并且同步需要额外的时间。