使用 BlockingCollection 作为队列等待消费者中的异步方法

awaiting async method in consumer using BlockingCollection as queue

我正在开发一个服务器端控制台应用程序,该应用程序从多个 WCF 服务接收数据,对其进行一些处理,然后使用 SignalR 通过单个连接将结果转发到 IIS 服务器。

我尝试使用生产者消费者模式来实现这一点,其中 WCF 服务是生产者,class 使用 SignalR 发送数据是消费者。对于队列,我使用了 BlockingCollection.

但是,当使用 await/async 发送数据时,消费者的 while 循环会卡住,直到所有其他线程都完成向队列添加数据。

出于测试目的,我用 Task.Delay(1000).Wait();await Task.Delay(1000); 替换了实际发送数据的代码,它们也都卡住了。 一个简单的 Thread.Sleep(1000); 似乎工作得很好,让我认为异步代码是问题所在。

所以我的问题是:是否有什么东西阻止异步代码在 while 循环中完成?我错过了什么?

我正在这样启动消费者线程:

new Thread(Worker).Start();

消费者代码:

private void Worker()
{
    while (!_queue.IsCompleted)
    {
        IMobileMessage msg = null;
        try
        {
            msg = _queue.Take();
        }
        catch (InvalidOperationException)
        {
        }

        if (msg != null)
        {
            try
            {
                Trace.TraceInformation("Sending: {0}", msg.Name);
                Thread.Sleep(1000); // <-- works
                //Task.Delay(1000).Wait(); // <-- doesn't work
                msg.SentTime = DateTime.UtcNow;
                Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
            }
            catch (Exception e)
            {
                TraceException(e);
            }
        }
    }
}

正如 spender 正确指出的那样,BlockingCollection(顾名思义)仅用于阻塞代码,不适用于异步代码。

有异步兼容的 producer/consumer 队列,例如 BufferBlock<T>。在这种情况下,我认为 ActionBlock<T> 会更好:

private ActionBlock<IMobileMsg> _block = new ActionBlock<IMobileMsg>(async msg =>
{
  try
  {
    Trace.TraceInformation("Sending: {0}", msg.Name);
    await Task.Delay(1000);
    msg.SentTime = DateTime.UtcNow;
    Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
  }
  catch (Exception e)
  {
    TraceException(e);
  }
});

这将替换您的整个消费线程和主循环。