使用 4.0 框架 类 和阻塞收集的 C# 中的生产者/混合消费者
Producer / hybrid consumer in C# using 4.0 framework classes and Blocking Collection
我遇到了 producer/consumer 场景。生产者从不停止,这意味着即使有一段时间 BC 中没有项目,以后也可以添加更多项目。
从 .NET Framework 3.5 迁移到 4.0,我决定使用 BlockingCollection
作为消费者和生产者之间的并发队列。我什至添加了一些并行扩展,因此我可以将 BC 与 Parallel.ForEach
.
一起使用
问题是,在消费者线程中,我需要有一种混合模型:
- 我总是检查 BC 以处理任何到达的项目
Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
- 在这个
foreach
里面,我执行了所有相互不依赖的任务。
- 问题来了。在将前面的任务并行化之后,我需要按照它们在 BC 中的相同 FIFO 顺序来管理它们的结果。这些结果的处理应该在同步线程中进行。
伪代码中的一个小例子如下:
制作人:
//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
//The object to add has a property with the sequence number
_concurrentCollection.TryAdd(scannedPage);
}
消费者:
private void Init()
{
_cancelTasks = false;
_checkTask = Task.Factory.StartNew(() =>
{
while (!_cancelTasks)
{
//BlockingCollections with Parallel ForEach
var bc = _concurrentCollection;
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
}
});
}
显然,这不能按原样工作,因为 .GetConsumingEnumerable()
会阻塞,直到 BC 中有另一个项目。我假设我可以用任务来完成,并且在同一批次中触发 4 或 5 个任务,但是:
- 我怎样才能对任务执行此操作,并且在任务开始之前仍然有一个等待点,该等待点会阻塞直到 BC 中有要消耗的项目(如果什么都没有,我不想开始处理. 一旦 BC 中有东西,我将开始这批 4 个任务,并在每个任务中使用
TryTake
,这样如果没有什么可拿的,它们就不会阻塞,因为我不知道我是否总是可以从 BC 中达到项目的数量作为任务的批次,例如,BC 中只剩下一个项目和一批 4 个任务)?
- 我怎样才能做到这一点并利用 Parallel.For 提供的效率?
- 如何按照从 BC 中提取项目的相同 FIFO 顺序保存任务的结果?
- 是否还有其他并发性 class 更适合消费者中这种项目的混合处理?
- 此外,这是我在 Whosebug 中提出的第一个问题,所以如果您需要更多数据或者您认为我的问题不正确,请告诉我。
我想我按照你的要求做了,为什么不创建一个 ConcurrentBag 并在处理时添加到它:
while (!_cancelTasks)
{
//BlockingCollections with Paralell ForEach
var bc = _concurrentCollection;
var q = new ConcurrentBag<ScannedPage>();
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
q.Add(item);
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
//process items in your list here by sorting using some sequence key
var items = q.OrderBy( o=> o.SeqNbr).ToList();
foreach( var item in items){
...
}
}
这显然不会按照它们添加到 BC 中的确切顺序将它们排入队列,但您可以像 Alex 建议的那样向 ScannedPage 对象添加一些序列 nbr,然后对结果进行排序。
以下是我处理序列的方式:
将此添加到 ScannedPage
class:
public static int _counter; //public because this is just an example but it would work.
获取序列nbr并在此处赋值:
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
lock( this){ //to single thread this process.. not necessary if it's already single threaded of course.
System.Threading.Interlocked.Increment( ref ScannedPage._counter);
scannedPage.SeqNbr = ScannedPage._counter;
...
}
}
每当您需要并行操作的结果时,使用 PLINQ 通常比使用 Parallel
class 更方便。以下是使用 PLINQ 重构代码的方法:
private void Init()
{
_cancelTasks = new CancellationTokenSource();
_checkTask = Task.Run(() =>
{
while (true)
{
_cancelTasks.Token.ThrowIfCancellationRequested();
var bc = _concurrentCollection;
var partitioner = Partitioner.Create(
bc.GetConsumingEnumerable(_cancelTasks.Token),
EnumerablePartitionerOptions.NoBuffering);
ScannedPage[] results = partitioner
.AsParallel()
.AsOrdered()
.Select(scannedPage =>
{
// Process the scannedPage
return scannedPage;
})
.ToArray();
// Process the results
}
});
}
.AsOrdered()
可确保您获得的结果与输入的顺序相同。
请注意,当您使用 Parallel
class 或 PLINQ 使用 BlockingCollection<T>
时,使用 Partitioner
和 EnumerablePartitionerOptions.NoBuffering
configuration, otherwise there is a risk of deadlocks 很重要. Parallel
/PLINQ 的默认贪婪行为和 BlockingCollection<T>
的阻塞行为不能很好地交互。
我遇到了 producer/consumer 场景。生产者从不停止,这意味着即使有一段时间 BC 中没有项目,以后也可以添加更多项目。
从 .NET Framework 3.5 迁移到 4.0,我决定使用 BlockingCollection
作为消费者和生产者之间的并发队列。我什至添加了一些并行扩展,因此我可以将 BC 与 Parallel.ForEach
.
问题是,在消费者线程中,我需要有一种混合模型:
- 我总是检查 BC 以处理任何到达的项目
Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
- 在这个
foreach
里面,我执行了所有相互不依赖的任务。 - 问题来了。在将前面的任务并行化之后,我需要按照它们在 BC 中的相同 FIFO 顺序来管理它们的结果。这些结果的处理应该在同步线程中进行。
伪代码中的一个小例子如下:
制作人:
//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
//The object to add has a property with the sequence number
_concurrentCollection.TryAdd(scannedPage);
}
消费者:
private void Init()
{
_cancelTasks = false;
_checkTask = Task.Factory.StartNew(() =>
{
while (!_cancelTasks)
{
//BlockingCollections with Parallel ForEach
var bc = _concurrentCollection;
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
}
});
}
显然,这不能按原样工作,因为 .GetConsumingEnumerable()
会阻塞,直到 BC 中有另一个项目。我假设我可以用任务来完成,并且在同一批次中触发 4 或 5 个任务,但是:
- 我怎样才能对任务执行此操作,并且在任务开始之前仍然有一个等待点,该等待点会阻塞直到 BC 中有要消耗的项目(如果什么都没有,我不想开始处理. 一旦 BC 中有东西,我将开始这批 4 个任务,并在每个任务中使用
TryTake
,这样如果没有什么可拿的,它们就不会阻塞,因为我不知道我是否总是可以从 BC 中达到项目的数量作为任务的批次,例如,BC 中只剩下一个项目和一批 4 个任务)? - 我怎样才能做到这一点并利用 Parallel.For 提供的效率?
- 如何按照从 BC 中提取项目的相同 FIFO 顺序保存任务的结果?
- 是否还有其他并发性 class 更适合消费者中这种项目的混合处理?
- 此外,这是我在 Whosebug 中提出的第一个问题,所以如果您需要更多数据或者您认为我的问题不正确,请告诉我。
我想我按照你的要求做了,为什么不创建一个 ConcurrentBag 并在处理时添加到它:
while (!_cancelTasks)
{
//BlockingCollections with Paralell ForEach
var bc = _concurrentCollection;
var q = new ConcurrentBag<ScannedPage>();
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
q.Add(item);
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
//process items in your list here by sorting using some sequence key
var items = q.OrderBy( o=> o.SeqNbr).ToList();
foreach( var item in items){
...
}
}
这显然不会按照它们添加到 BC 中的确切顺序将它们排入队列,但您可以像 Alex 建议的那样向 ScannedPage 对象添加一些序列 nbr,然后对结果进行排序。
以下是我处理序列的方式:
将此添加到 ScannedPage
class:
public static int _counter; //public because this is just an example but it would work.
获取序列nbr并在此处赋值:
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
lock( this){ //to single thread this process.. not necessary if it's already single threaded of course.
System.Threading.Interlocked.Increment( ref ScannedPage._counter);
scannedPage.SeqNbr = ScannedPage._counter;
...
}
}
每当您需要并行操作的结果时,使用 PLINQ 通常比使用 Parallel
class 更方便。以下是使用 PLINQ 重构代码的方法:
private void Init()
{
_cancelTasks = new CancellationTokenSource();
_checkTask = Task.Run(() =>
{
while (true)
{
_cancelTasks.Token.ThrowIfCancellationRequested();
var bc = _concurrentCollection;
var partitioner = Partitioner.Create(
bc.GetConsumingEnumerable(_cancelTasks.Token),
EnumerablePartitionerOptions.NoBuffering);
ScannedPage[] results = partitioner
.AsParallel()
.AsOrdered()
.Select(scannedPage =>
{
// Process the scannedPage
return scannedPage;
})
.ToArray();
// Process the results
}
});
}
.AsOrdered()
可确保您获得的结果与输入的顺序相同。
请注意,当您使用 Parallel
class 或 PLINQ 使用 BlockingCollection<T>
时,使用 Partitioner
和 EnumerablePartitionerOptions.NoBuffering
configuration, otherwise there is a risk of deadlocks 很重要. Parallel
/PLINQ 的默认贪婪行为和 BlockingCollection<T>
的阻塞行为不能很好地交互。