BlockingCollection Take() 永远阻塞
BlockingCollection Take() Blocking Forever
我有一个使用阻塞集合的简单生产者消费者设置。在我们的应用程序运行期间,消费者处于循环中,等待消费者将项目放入集合中,然后取出项目并将其写入串行端口。出于某种原因,当集合中有项目时,collection.Take() 会永远阻塞。对于这个应用程序,我们可能同时有一个或多个 ProducerConsumers 处于活动状态。不管怎样,他们的行为都是一样的。
public class ProducerConsumer
{
private Task _backgroundWorker;
private CancellationTokenSource _cancellationTokenSource;
private BlockingCollection<Data> _dataQueue;
public ProducerConsumer()
{
_dataQueue = new BlockingCollection<Data>();
_cancellationTokenSource = new CancellationTokenSource();
_backgroundWorker = new Task(() => DoWork(_cancellationTokenSource.Token), TaskCreationOptions.LongRunning);
_backgroundWorker.Start();
}
public void AddData(Data data)
{
_dataQueue.Add(data);
System.Diagnostics.Debug.WriteLine(_dataQueue.Count);
}
private void DoWork(CancellationToken cancellationToken)
{
while(!cancellationToken.IsCancellationRequested)
{
try
{
_dataQueue.Take(cancellationToken); //This is blocking forever
//DoWork
}
catch(OperationCanceledException) { }
catch(Exception e)
{
System.Diagnostics.Debug.WriteLine(e.ToString());
throw;
}
}
}
}
当 运行 此打印语句递增时,我们肯定在集合中有数据,但无论出于何种原因,Take() 继续阻塞。
它也没有抛出异常。
使用 Dispose() 请求取消,但我没有在此处添加。不叫早叫。
我试过使用 .GetConsumingEnumerable() 并且它也永远阻塞。
我是不是开始任务有误?我会 运行 没线了吗?
我考虑过使用 BackgroundWorker 而不是 Task,但根据 MSFT Task 是首选。
提前致谢。
首先,我不会尝试创建自己的 producer/consumer 实现,尤其是不会阻塞的实现。使用 ActionBlock 可以轻松处理简单的 producer/consumer 场景。 ActionBlock 有一个内部队列,多个并发生产者可以向其发送 post 消息。 ActionbBlock 将使用传递给其构造函数的 worker 方法在后台处理排队的消息:
class SerialWorker
{
ActionBlock<Data> _serialBlock;
public SerialWorker()
{
_serialBlock=new ActionBlock<Data>(data=>DoWork(data));
}
//The worker action can be synchronous
private void DoWork(Data data)
{
}
//or asynchronous
private async Task DoWorkAsync(Data data)
{
}
//Producer Code
//While the application runs :
public void PostData(Data data)
{
_serialBlock.Post(someData);
}
//When the application finishes
//Tell the block to shut down and wait for it to process any leftover requests
public async Task Shutdown()
{
_serialBlock.Complete();
await _serialBlock.Completion;
}
worker 方法可以是异步的,例如 new ActionBlock<Data>(data=>DoWorkAsync(data))
就可以正常工作。这允许使用异步方法而不会在 worker 内部阻塞。
新消息 post 编辑 ActionBlock.Post
。当需要关闭时,应用程序应调用 Complete()
通知操作块并等待它完成。 ActionBlock 将停止接收更多消息并在终止前处理仍留在其缓冲区中的任何内容。
我有一个使用阻塞集合的简单生产者消费者设置。在我们的应用程序运行期间,消费者处于循环中,等待消费者将项目放入集合中,然后取出项目并将其写入串行端口。出于某种原因,当集合中有项目时,collection.Take() 会永远阻塞。对于这个应用程序,我们可能同时有一个或多个 ProducerConsumers 处于活动状态。不管怎样,他们的行为都是一样的。
public class ProducerConsumer
{
private Task _backgroundWorker;
private CancellationTokenSource _cancellationTokenSource;
private BlockingCollection<Data> _dataQueue;
public ProducerConsumer()
{
_dataQueue = new BlockingCollection<Data>();
_cancellationTokenSource = new CancellationTokenSource();
_backgroundWorker = new Task(() => DoWork(_cancellationTokenSource.Token), TaskCreationOptions.LongRunning);
_backgroundWorker.Start();
}
public void AddData(Data data)
{
_dataQueue.Add(data);
System.Diagnostics.Debug.WriteLine(_dataQueue.Count);
}
private void DoWork(CancellationToken cancellationToken)
{
while(!cancellationToken.IsCancellationRequested)
{
try
{
_dataQueue.Take(cancellationToken); //This is blocking forever
//DoWork
}
catch(OperationCanceledException) { }
catch(Exception e)
{
System.Diagnostics.Debug.WriteLine(e.ToString());
throw;
}
}
}
}
当 运行 此打印语句递增时,我们肯定在集合中有数据,但无论出于何种原因,Take() 继续阻塞。
它也没有抛出异常。
使用 Dispose() 请求取消,但我没有在此处添加。不叫早叫。
我试过使用 .GetConsumingEnumerable() 并且它也永远阻塞。
我是不是开始任务有误?我会 运行 没线了吗?
我考虑过使用 BackgroundWorker 而不是 Task,但根据 MSFT Task 是首选。
提前致谢。
首先,我不会尝试创建自己的 producer/consumer 实现,尤其是不会阻塞的实现。使用 ActionBlock 可以轻松处理简单的 producer/consumer 场景。 ActionBlock 有一个内部队列,多个并发生产者可以向其发送 post 消息。 ActionbBlock 将使用传递给其构造函数的 worker 方法在后台处理排队的消息:
class SerialWorker
{
ActionBlock<Data> _serialBlock;
public SerialWorker()
{
_serialBlock=new ActionBlock<Data>(data=>DoWork(data));
}
//The worker action can be synchronous
private void DoWork(Data data)
{
}
//or asynchronous
private async Task DoWorkAsync(Data data)
{
}
//Producer Code
//While the application runs :
public void PostData(Data data)
{
_serialBlock.Post(someData);
}
//When the application finishes
//Tell the block to shut down and wait for it to process any leftover requests
public async Task Shutdown()
{
_serialBlock.Complete();
await _serialBlock.Completion;
}
worker 方法可以是异步的,例如 new ActionBlock<Data>(data=>DoWorkAsync(data))
就可以正常工作。这允许使用异步方法而不会在 worker 内部阻塞。
新消息 post 编辑 ActionBlock.Post
。当需要关闭时,应用程序应调用 Complete()
通知操作块并等待它完成。 ActionBlock 将停止接收更多消息并在终止前处理仍留在其缓冲区中的任何内容。