使用 BlockingCollection 作为队列等待消费者中的异步方法
awaiting async method in consumer using BlockingCollection as queue
我正在开发一个服务器端控制台应用程序,该应用程序从多个 WCF 服务接收数据,对其进行一些处理,然后使用 SignalR 通过单个连接将结果转发到 IIS 服务器。
我尝试使用生产者消费者模式来实现这一点,其中 WCF 服务是生产者,class 使用 SignalR 发送数据是消费者。对于队列,我使用了 BlockingCollection.
但是,当使用 await/async 发送数据时,消费者的 while 循环会卡住,直到所有其他线程都完成向队列添加数据。
出于测试目的,我用 Task.Delay(1000).Wait();
或 await Task.Delay(1000);
替换了实际发送数据的代码,它们也都卡住了。
一个简单的 Thread.Sleep(1000);
似乎工作得很好,让我认为异步代码是问题所在。
所以我的问题是:是否有什么东西阻止异步代码在 while 循环中完成?我错过了什么?
我正在这样启动消费者线程:
new Thread(Worker).Start();
消费者代码:
private void Worker()
{
while (!_queue.IsCompleted)
{
IMobileMessage msg = null;
try
{
msg = _queue.Take();
}
catch (InvalidOperationException)
{
}
if (msg != null)
{
try
{
Trace.TraceInformation("Sending: {0}", msg.Name);
Thread.Sleep(1000); // <-- works
//Task.Delay(1000).Wait(); // <-- doesn't work
msg.SentTime = DateTime.UtcNow;
Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
}
catch (Exception e)
{
TraceException(e);
}
}
}
}
正如 spender 正确指出的那样,BlockingCollection
(顾名思义)仅用于阻塞代码,不适用于异步代码。
有异步兼容的 producer/consumer 队列,例如 BufferBlock<T>
。在这种情况下,我认为 ActionBlock<T>
会更好:
private ActionBlock<IMobileMsg> _block = new ActionBlock<IMobileMsg>(async msg =>
{
try
{
Trace.TraceInformation("Sending: {0}", msg.Name);
await Task.Delay(1000);
msg.SentTime = DateTime.UtcNow;
Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
}
catch (Exception e)
{
TraceException(e);
}
});
这将替换您的整个消费线程和主循环。
我正在开发一个服务器端控制台应用程序,该应用程序从多个 WCF 服务接收数据,对其进行一些处理,然后使用 SignalR 通过单个连接将结果转发到 IIS 服务器。
我尝试使用生产者消费者模式来实现这一点,其中 WCF 服务是生产者,class 使用 SignalR 发送数据是消费者。对于队列,我使用了 BlockingCollection.
但是,当使用 await/async 发送数据时,消费者的 while 循环会卡住,直到所有其他线程都完成向队列添加数据。
出于测试目的,我用 Task.Delay(1000).Wait();
或 await Task.Delay(1000);
替换了实际发送数据的代码,它们也都卡住了。
一个简单的 Thread.Sleep(1000);
似乎工作得很好,让我认为异步代码是问题所在。
所以我的问题是:是否有什么东西阻止异步代码在 while 循环中完成?我错过了什么?
我正在这样启动消费者线程:
new Thread(Worker).Start();
消费者代码:
private void Worker()
{
while (!_queue.IsCompleted)
{
IMobileMessage msg = null;
try
{
msg = _queue.Take();
}
catch (InvalidOperationException)
{
}
if (msg != null)
{
try
{
Trace.TraceInformation("Sending: {0}", msg.Name);
Thread.Sleep(1000); // <-- works
//Task.Delay(1000).Wait(); // <-- doesn't work
msg.SentTime = DateTime.UtcNow;
Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
}
catch (Exception e)
{
TraceException(e);
}
}
}
}
正如 spender 正确指出的那样,BlockingCollection
(顾名思义)仅用于阻塞代码,不适用于异步代码。
有异步兼容的 producer/consumer 队列,例如 BufferBlock<T>
。在这种情况下,我认为 ActionBlock<T>
会更好:
private ActionBlock<IMobileMsg> _block = new ActionBlock<IMobileMsg>(async msg =>
{
try
{
Trace.TraceInformation("Sending: {0}", msg.Name);
await Task.Delay(1000);
msg.SentTime = DateTime.UtcNow;
Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime);
}
catch (Exception e)
{
TraceException(e);
}
});
这将替换您的整个消费线程和主循环。