等待来自永无止境的任务的事件异步
Wait async for a event from a never ending Task
目前我正在构建一个需要等待传入消息的网络服务器,但我需要异步等待这些消息。这些消息在保持 运行 的另一个任务中接收。无尽的任务推送事件,另一个任务需要等待这些事件之一也被接收。
如果需要的话,我会做一个草图来更好地形象化问题
我已经看到 TaskCompletionSource class,但除非客户端断开连接,否则任务将不会完成,因此它不会起作用。等待任务完全一样
是否有适用于 C#/.net 核心的库或内置解决方案?
谢谢:)
.NET Core 2.1 中有新的 Channel<T>
API 和 System.Threading.Channels 可让您异步使用队列:
Channel<int> channel = Channel.CreateUnbounded<int>();
...
ChannelReader<int> c = channel.Reader;
while (await c.WaitToReadAsync())
{
if (await c.ReadAsync(out int item))
{
// process item...
}
}
请参阅 this 博客 post 了解介绍和更多使用示例。
频道
ASP.NET Core SignalR 使用 Channels to implement event streaming - 发布者方法异步生成由另一个方法处理的事件。在 SignalR 的例子中,消费者将每个新事件推送给客户端。
在您的代码中执行相同的操作很容易,但请确保遵循 SignalR 文档中显示的模式 - 通道由工作人员自己创建,永远不会向调用者公开。这意味着通道的状态没有歧义,因为只有工作人员可以关闭它。
另一个重点是您必须在完成后关闭频道,即使出现异常也是如此。否则,消费者将无限期地阻止。
您还需要将 CancellationToken 传递给生产者,以便您可以在某个时候终止它。即使您不打算显式取消生产者,您也需要一种方法来告诉它在应用程序终止时停止。
以下方法创建通道 并且 确保它即使在发生异常时也会关闭。
ChannelReader<int> MyProducer(someparameters,CancellationToken token)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(async()=>{
while(!token.IsCancellationRequested)
{
var i= ... //do something to produce a value
await writer.WriteAsync(i,token);
}
},token)
//IMPORTANT: Close the channel no matter what.
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader;
}
t.Exception
如果生产者正常完成或者通过令牌取消工作任务,则
t.Exception
将为 null。没有理由使用 ChannelReader.TryComplete
因为一次只有一个任务是写给作者。
消费者只需要 reader 来消费事件:
async Task MyConsumer(ChannerReader<int> reader,CancellationToken token)
{
while (await reader.WaitToReadAsync(cts.Token).ConfigureAwait(false))
{
while (reader.TryRead(out int item))
{
//Use that integer
}
}
}
可用于:
var reader=MyProducer(...,cts.Token);
await MyConsumer(reader,cts.Token);
IAsyncEnumerable
我在这里有点作弊,因为我从 ChannelReader.ReadAllAsync 方法复制了循环代码,其中 returns 一个允许异步循环的 IAsyncEnumerable。在 .NET Core 3.0 和 .NET Standard 2.1(但不仅如此)中,可以用 :
替换消费者
await foreach (var i from reader.ReadAllAsync(token))
{
//Use that integer
}
Microsoft.Bcl.AsyncInterfaces 包添加了与以前的 .NET 版本相同的接口
目前我正在构建一个需要等待传入消息的网络服务器,但我需要异步等待这些消息。这些消息在保持 运行 的另一个任务中接收。无尽的任务推送事件,另一个任务需要等待这些事件之一也被接收。
如果需要的话,我会做一个草图来更好地形象化问题
我已经看到 TaskCompletionSource class,但除非客户端断开连接,否则任务将不会完成,因此它不会起作用。等待任务完全一样
是否有适用于 C#/.net 核心的库或内置解决方案?
谢谢:)
.NET Core 2.1 中有新的 Channel<T>
API 和 System.Threading.Channels 可让您异步使用队列:
Channel<int> channel = Channel.CreateUnbounded<int>();
...
ChannelReader<int> c = channel.Reader;
while (await c.WaitToReadAsync())
{
if (await c.ReadAsync(out int item))
{
// process item...
}
}
请参阅 this 博客 post 了解介绍和更多使用示例。
频道
ASP.NET Core SignalR 使用 Channels to implement event streaming - 发布者方法异步生成由另一个方法处理的事件。在 SignalR 的例子中,消费者将每个新事件推送给客户端。
在您的代码中执行相同的操作很容易,但请确保遵循 SignalR 文档中显示的模式 - 通道由工作人员自己创建,永远不会向调用者公开。这意味着通道的状态没有歧义,因为只有工作人员可以关闭它。
另一个重点是您必须在完成后关闭频道,即使出现异常也是如此。否则,消费者将无限期地阻止。
您还需要将 CancellationToken 传递给生产者,以便您可以在某个时候终止它。即使您不打算显式取消生产者,您也需要一种方法来告诉它在应用程序终止时停止。
以下方法创建通道 并且 确保它即使在发生异常时也会关闭。
ChannelReader<int> MyProducer(someparameters,CancellationToken token)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(async()=>{
while(!token.IsCancellationRequested)
{
var i= ... //do something to produce a value
await writer.WriteAsync(i,token);
}
},token)
//IMPORTANT: Close the channel no matter what.
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader;
}
t.Exception
如果生产者正常完成或者通过令牌取消工作任务,则
t.Exception
将为 null。没有理由使用 ChannelReader.TryComplete
因为一次只有一个任务是写给作者。
消费者只需要 reader 来消费事件:
async Task MyConsumer(ChannerReader<int> reader,CancellationToken token)
{
while (await reader.WaitToReadAsync(cts.Token).ConfigureAwait(false))
{
while (reader.TryRead(out int item))
{
//Use that integer
}
}
}
可用于:
var reader=MyProducer(...,cts.Token);
await MyConsumer(reader,cts.Token);
IAsyncEnumerable
我在这里有点作弊,因为我从 ChannelReader.ReadAllAsync 方法复制了循环代码,其中 returns 一个允许异步循环的 IAsyncEnumerable。在 .NET Core 3.0 和 .NET Standard 2.1(但不仅如此)中,可以用 :
替换消费者await foreach (var i from reader.ReadAllAsync(token))
{
//Use that integer
}
Microsoft.Bcl.AsyncInterfaces 包添加了与以前的 .NET 版本相同的接口