异步线程死于 TPL 等待

Async thread dies on TPL await

我正在编写一个简单的 producer/consumer 应用程序,但我注意到一个非常奇怪的行为..这是代码:

private Thread _timelineThread = null;
private BufferBlock<RtpPacket> _queue = null;
private AutoResetEvent _stopping = new AutoResetEvent(false);

static void Main(string[] args)
{  
  // Start consumer thread
  Consume();
  
  // Produce
  var t = new Thread(() =>
  {
    while (true)
    {
      var packet = RtpPacket.GetNext();
      _queue.Post(packet);
      Thread.Sleep(70);
    }
  }
  t.Join();
}

static void Consume()
{
  _timelineThread = new Thread(async () =>
  {
    while (_stopping.WaitOne(0) == false)
    {
      // Start consuming...
      while (await _queue.OutputAvailableAsync())
      {
        var packet = await _queue.ReceiveAsync();
        // Some processing...
      }
    }
  });
  _timelineThread.Start();   
}

这是一个无限循环(直到我路由 _stopping 信号)。但是,当 _timelineThread 遇到第一个 await _queue.OutputAvailableAsync() 时,线程将状态更改为 'Stopped'。有什么我没有考虑的问题吗?

如果我将 Consume() 函数更改为:

static void Consume()
{
  _timelineThread = new Thread(() =>
  {
    while (_stopping.WaitOne(0) == false)
    {
      // Start consuming...
      while (_queue.OutputAvailableAsync().GetAwaiter().GetResult())
      {
        var packet = _queue.ReceiveAsync().GetAwaiter().GetResult();
        // Some processing...
      }
    }
  });
  _timelineThread.Start();   
}

线程运行没有任何问题..但代码与上一个几乎相同..

编辑:一小时后这个 'hack' 似乎也不起作用..线程是 'Running' 但我没有从队列中收到任何数据..

Thread 构造函数不理解 async 委托。您可以在这里阅读相关内容:

我的建议是使用同步BlockingCollection<RtpPacket> instead of the BufferBlock<RtpPacket>, and consume it by enumerating the GetConsumingEnumerable方法:

var _queue = new BlockingCollection<RtpPacket>();

var producer = new Thread(() =>
{
    while (true)
    {
        var packet = RtpPacket.GetNext();
        if (packet == null) { _queue.CompleteAdding(); break; }
        _queue.Add(packet);
        Thread.Sleep(70);
    }
});

var consumer = new Thread(() =>
{
    foreach (var packet in _queue.GetConsumingEnumerable())
    {
        // Some processing...
    }
});

producer.Start();
consumer.Start();

producer.Join();
consumer.Join();