BlockingCollection Take() 永远阻塞

BlockingCollection Take() Blocking Forever

我有一个使用阻塞集合的简单生产者消费者设置。在我们的应用程序运行期间,消费者处于循环中,等待消费者将项目放入集合中,然后取出项目并将其写入串行端口。出于某种原因,当集合中有项目时,collection.Take() 会永远阻塞。对于这个应用程序,我们可能同时有一个或多个 ProducerConsumers 处于活动状态。不管怎样,他们的行为都是一样的。

public class ProducerConsumer 
{
    private Task _backgroundWorker;
    private CancellationTokenSource _cancellationTokenSource;
    private BlockingCollection<Data> _dataQueue;

    public ProducerConsumer() 
    {
        _dataQueue = new BlockingCollection<Data>();
        _cancellationTokenSource = new CancellationTokenSource();
        _backgroundWorker = new Task(() => DoWork(_cancellationTokenSource.Token), TaskCreationOptions.LongRunning);
        _backgroundWorker.Start();
    }

    public void AddData(Data data) 
    {
        _dataQueue.Add(data);
        System.Diagnostics.Debug.WriteLine(_dataQueue.Count);
    }

    private void DoWork(CancellationToken cancellationToken)
    {
        while(!cancellationToken.IsCancellationRequested)
        {
            try
            {
                _dataQueue.Take(cancellationToken); //This is blocking forever

                //DoWork
            }
            catch(OperationCanceledException) { }
            catch(Exception e)
            {
                System.Diagnostics.Debug.WriteLine(e.ToString());
                throw;
            }
        }
    }  
}

当 运行 此打印语句递增时,我们肯定在集合中有数据,但无论出于何种原因,Take() 继续阻塞。

它也没有抛出异常。

使用 Dispose() 请求取消,但我没有在此处添加。不叫早叫。

我试过使用 .GetConsumingEnumerable() 并且它也永远阻塞。

我是不是开始任务有误?我会 运行 没线了吗?

我考虑过使用 BackgroundWorker 而不是 Task,但根据 MSFT Task 是首选。

提前致谢。

首先,我不会尝试创建自己的 producer/consumer 实现,尤其是不会阻塞的实现。使用 ActionBlock 可以轻松处理简单的 producer/consumer 场景。 ActionBlock 有一个内部队列,多个并发生产者可以向其发送 post 消息。 ActionbBlock 将使用传递给其构造函数的 worker 方法在后台处理排队的消息:

class SerialWorker
{
    ActionBlock<Data>  _serialBlock;

    public SerialWorker()
    {    
        _serialBlock=new ActionBlock<Data>(data=>DoWork(data));
    }

    //The worker action can be synchronous 
    private void DoWork(Data data)
    {
    }
    //or asynchronous
    private async Task DoWorkAsync(Data data)
    {
    }


    //Producer Code
    //While the application runs :
    public void PostData(Data data)
    {
        _serialBlock.Post(someData);
    }

//When the application finishes 
//Tell the block to shut down and wait for it to process any leftover requests
    public async Task Shutdown()
    {
        _serialBlock.Complete();    
        await _serialBlock.Completion;
    }

worker 方法可以是异步的,例如 new ActionBlock<Data>(data=>DoWorkAsync(data)) 就可以正常工作。这允许使用异步方法而不会在 worker 内部阻塞。

新消息 post 编辑 ActionBlock.Post。当需要关闭时,应用程序应调用 Complete() 通知操作块并等待它完成。 ActionBlock 将停止接收更多消息并在终止前处理仍留在其缓冲区中的任何内容。