使用 4.0 框架 类 和阻塞收集的 C# 中的生产者/混合消费者

Producer / hybrid consumer in C# using 4.0 framework classes and Blocking Collection

我遇到了 producer/consumer 场景。生产者从不停止,这意味着即使有一段时间 BC 中没有项目,以后也可以添加更多项目。

从 .NET Framework 3.5 迁移到 4.0,我决定使用 BlockingCollection 作为消费者和生产者之间的并发队列。我什至添加了一些并行扩展,因此我可以将 BC 与 Parallel.ForEach.

一起使用

问题是,在消费者线程中,我需要有一种混合模型:

  1. 我总是检查 BC 以处理任何到达的项目 Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
  2. 在这个foreach里面,我执行了所有相互不依赖的任务。
  3. 问题来了。在将前面的任务并行化之后,我需要按照它们在 BC 中的相同 FIFO 顺序来管理它们的结果。这些结果的处理应该在同步线程中进行。

伪代码中的一个小例子如下:

制作人:

//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
     //The object to add has a property with the sequence number
    _concurrentCollection.TryAdd(scannedPage);
}

消费者:

private void Init()
{
    _cancelTasks = false;
    _checkTask = Task.Factory.StartNew(() =>
            {
                while (!_cancelTasks)
                {
                    //BlockingCollections with Parallel ForEach
                    var bc = _concurrentCollection;
                    Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
                    {
                        ScannedPage currentPage = item;
                        // process a batch of images from the bc and check if an image has a valid barcode. T
                    });
                    //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.

                }
            });
}
            

显然,这不能按原样工作,因为 .GetConsumingEnumerable() 会阻塞,直到 BC 中有另一个项目。我假设我可以用任务来完成,并且在同一批次中触发 4 或 5 个任务,但是:

  1. 我怎样才能对任务执行此操作,并且在任务开始之前仍然有一个等待点,该等待点会阻塞直到 BC 中有要消耗的项目(如果什么都没有,我不想开始处理. 一旦 BC 中有东西,我将开始这批 4 个任务,并在每个任务中使用 TryTake,这样如果没有什么可拿的,它们就不会阻塞,因为我不知道我是否总是可以从 BC 中达到项目的数量作为任务的批次,例如,BC 中只剩下一个项目和一批 4 个任务)?
  2. 我怎样才能做到这一点并利用 Parallel.For 提供的效率?
  3. 如何按照从 BC 中提取项目的相同 FIFO 顺序保存任务的结果?
  4. 是否还有其他并发性 class 更适合消费者中这种项目的混合处理?
  5. 此外,这是我在 Whosebug 中提出的第一个问题,所以如果您需要更多数据或者您认为我的问题不正确,请告诉我。

我想我按照你的要求做了,为什么不创建一个 ConcurrentBag 并在处理时添加到它:

while (!_cancelTasks)
{
   //BlockingCollections with Paralell ForEach
   var bc = _concurrentCollection;
   var q = new ConcurrentBag<ScannedPage>();
   Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
   {
      ScannedPage currentPage = item;
      q.Add(item);
      // process a batch of images from the bc and check if an image has a valid barcode. T
   });
 //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.


  //process items in your list here by sorting using some sequence key
  var items = q.OrderBy( o=> o.SeqNbr).ToList();
  foreach( var item in items){
     ...
  }
}

这显然不会按照它们添加到 BC 中的确切顺序将它们排入队列,但您可以像 Alex 建议的那样向 ScannedPage 对象添加一些序列 nbr,然后对结果进行排序。

以下是我处理序列的方式:

将此添加到 ScannedPage class:

public static int _counter;  //public because this is just an example but it would work.

获取序列nbr并在此处赋值:

private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
    lock( this){   //to single thread this process.. not necessary if it's already single threaded of course.
    System.Threading.Interlocked.Increment( ref ScannedPage._counter);
    scannedPage.SeqNbr = ScannedPage._counter;
    ...
    }
}

每当您需要并行操作的结果时,使用 PLINQ 通常比使用 Parallel class 更方便。以下是使用 PLINQ 重构代码的方法:

private void Init()
{
    _cancelTasks = new CancellationTokenSource();
    _checkTask = Task.Run(() =>
    {
        while (true)
        {
            _cancelTasks.Token.ThrowIfCancellationRequested();

            var bc = _concurrentCollection;
            var partitioner = Partitioner.Create(
                bc.GetConsumingEnumerable(_cancelTasks.Token),
                EnumerablePartitionerOptions.NoBuffering);

            ScannedPage[] results = partitioner
                .AsParallel()
                .AsOrdered()
                .Select(scannedPage =>
                {
                    // Process the scannedPage
                    return scannedPage;
                })
                .ToArray();

            // Process the results
        }
    });
}

.AsOrdered() 可确保您获得的结果与输入的顺序相同。

请注意,当您使用 Parallel class 或 PLINQ 使用 BlockingCollection<T> 时,使用 PartitionerEnumerablePartitionerOptions.NoBuffering configuration, otherwise there is a risk of deadlocks 很重要. Parallel/PLINQ 的默认贪婪行为和 BlockingCollection<T> 的阻塞行为不能很好地交互。