使用 System.Threading.Channels 时不执行异步方法
Async method not executing when using System.Threading.Channels
出于某种原因,消费者和生产者任务中的代码似乎从未被执行过。我哪里错了?
using System.Threading.Channels;
namespace TEST.CHANNELS
{
public class Program
{
public static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
var cancel = new CancellationToken();
await Consumer(channel, cancel);
await Producer(channel, cancel);
Console.ReadKey();
}
private static async Task Producer(Channel<int, int> ch, CancellationToken cancellationToken)
{
for (int i = 0; i < 59; i++)
{
await Task.Delay(1000, cancellationToken);
await ch.Writer.WriteAsync(i, cancellationToken);
}
}
private static async Task Consumer(Channel<int, int> ch, CancellationToken cancellationToken)
{
await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}
}
}
如果你是新手,我建议阅读Tutorial: Learn to debug C# code using Visual Studio。您应该知道如何设置断点以逐步查看您的代码。运行。
但是现在因为这个涉及 async/Task,它可能看起来很混乱,但是当你进入 Consumer
时,你会看到它停在 await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
行。
原因是消费者正在等待生产者从未放入的东西。原因是您的第一个 await
停止了您的代码,因此第二行永远不会执行。
await Consumer(channel, cancel);
await Producer(channel, cancel);
这应该可以解决问题:
var consumerTask = Consumer(channel, cancel);
var producerTask = Producer(channel, cancel);
await Task.WhenAll(consumerTask, producerTask);
上面的代码说的是,
运行 Consumer Task,不要等待它,而是在consumerTask
.
中跟踪它
运行 Producer Task,不要等待,而是在producerTask
.
中跟踪它
等待 consumerTask
和 producerTask
都使用 Task.WhenAll。
请注意,Consumer 似乎仍然存在逻辑问题,因为它永远不会退出,因此您的 ReadKey()
可能不会受到攻击(您的应用程序会卡在 WhenAll
行)。如果你打算修复它,如果它是一个错误,我认为这对你来说更容易“实践”。
您的代码试图在生成任何消息之前使用通道中的所有消息。虽然您可以存储 producer/consumer 任务而不是等待它们,但最好使用频道特定的习语和模式。
与其将频道用作某种容器,不如将读者公开并共享给消费者创建和拥有的频道。这就是 Go 中 Channels 的使用方式。
这就是为什么您也只能使用 ChannelReader 和 ChannelWriter 的原因:
- ChannelReader 是 Go 中的
ch ->
,是从通道读取的唯一方法
- ChannelWriter是Go中的一个
ch <-
,唯一的写法
使用自有频道
如果您需要异步处理数据,请在 方法中的 任务中执行此操作。这使得 lot 更容易控制通道并知道处理何时完成或取消。它还允许您非常容易地从通道构建管道。
在您的情况下,制作人可能是:
public ChannelReader<int> Producer(CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(()=>{
for (int i = 0; i < 59; i++)
{
await Task.Delay(1000, cancellationToken);
await writer.WriteAsync(i, cancellationToken);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
消费者,如果是懒惰的,可以是:
static async Task ConsumeNumbers(this ChannelReader<int> reader, CancellationToken cancellationToken)
{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}
将此作为扩展方法两者都可以结合使用:
await Producer(cancel)
.ConsumeNumbers(cancel);
在更一般的情况下,管道块从一个通道和 returns 一个通道读取:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
这将允许创建一系列步骤,例如:
await Producer(cancel)
.RaiseTo(0.3,cancel)
.RaiseTo(3,cancel)
.ConsumeNumbers(cancel);
并行处理
也可以在每个块中使用多个任务,以加快处理速度。在 .NET 6 中,可以使用 Parallel.ForEachAsync
轻松完成此操作:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Parallel.ForEachAsync(
reader.ReadAllAsync(cancellationToken),
cancellationToken,
async item=>
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
})
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
注意顺序
通道保留项目 和 读取请求的顺序。这意味着单任务步骤将始终按顺序消耗 和 生成消息。但是 Parallel.ForEachAsync
没有这样的保证。如果顺序很重要,您必须添加代码以确保消息按顺序发出,或者尝试通过另一个步骤重新排序。
出于某种原因,消费者和生产者任务中的代码似乎从未被执行过。我哪里错了?
using System.Threading.Channels;
namespace TEST.CHANNELS
{
public class Program
{
public static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
var cancel = new CancellationToken();
await Consumer(channel, cancel);
await Producer(channel, cancel);
Console.ReadKey();
}
private static async Task Producer(Channel<int, int> ch, CancellationToken cancellationToken)
{
for (int i = 0; i < 59; i++)
{
await Task.Delay(1000, cancellationToken);
await ch.Writer.WriteAsync(i, cancellationToken);
}
}
private static async Task Consumer(Channel<int, int> ch, CancellationToken cancellationToken)
{
await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}
}
}
如果你是新手,我建议阅读Tutorial: Learn to debug C# code using Visual Studio。您应该知道如何设置断点以逐步查看您的代码。运行。
但是现在因为这个涉及 async/Task,它可能看起来很混乱,但是当你进入 Consumer
时,你会看到它停在 await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
行。
原因是消费者正在等待生产者从未放入的东西。原因是您的第一个 await
停止了您的代码,因此第二行永远不会执行。
await Consumer(channel, cancel);
await Producer(channel, cancel);
这应该可以解决问题:
var consumerTask = Consumer(channel, cancel);
var producerTask = Producer(channel, cancel);
await Task.WhenAll(consumerTask, producerTask);
上面的代码说的是,
运行 Consumer Task,不要等待它,而是在
中跟踪它consumerTask
.运行 Producer Task,不要等待,而是在
中跟踪它producerTask
.等待
consumerTask
和producerTask
都使用 Task.WhenAll。
请注意,Consumer 似乎仍然存在逻辑问题,因为它永远不会退出,因此您的 ReadKey()
可能不会受到攻击(您的应用程序会卡在 WhenAll
行)。如果你打算修复它,如果它是一个错误,我认为这对你来说更容易“实践”。
您的代码试图在生成任何消息之前使用通道中的所有消息。虽然您可以存储 producer/consumer 任务而不是等待它们,但最好使用频道特定的习语和模式。
与其将频道用作某种容器,不如将读者公开并共享给消费者创建和拥有的频道。这就是 Go 中 Channels 的使用方式。
这就是为什么您也只能使用 ChannelReader 和 ChannelWriter 的原因:
- ChannelReader 是 Go 中的
ch ->
,是从通道读取的唯一方法 - ChannelWriter是Go中的一个
ch <-
,唯一的写法
使用自有频道
如果您需要异步处理数据,请在 方法中的 任务中执行此操作。这使得 lot 更容易控制通道并知道处理何时完成或取消。它还允许您非常容易地从通道构建管道。
在您的情况下,制作人可能是:
public ChannelReader<int> Producer(CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(()=>{
for (int i = 0; i < 59; i++)
{
await Task.Delay(1000, cancellationToken);
await writer.WriteAsync(i, cancellationToken);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
消费者,如果是懒惰的,可以是:
static async Task ConsumeNumbers(this ChannelReader<int> reader, CancellationToken cancellationToken)
{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}
将此作为扩展方法两者都可以结合使用:
await Producer(cancel)
.ConsumeNumbers(cancel);
在更一般的情况下,管道块从一个通道和 returns 一个通道读取:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
这将允许创建一系列步骤,例如:
await Producer(cancel)
.RaiseTo(0.3,cancel)
.RaiseTo(3,cancel)
.ConsumeNumbers(cancel);
并行处理
也可以在每个块中使用多个任务,以加快处理速度。在 .NET 6 中,可以使用 Parallel.ForEachAsync
轻松完成此操作:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Parallel.ForEachAsync(
reader.ReadAllAsync(cancellationToken),
cancellationToken,
async item=>
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
})
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
注意顺序
通道保留项目 和 读取请求的顺序。这意味着单任务步骤将始终按顺序消耗 和 生成消息。但是 Parallel.ForEachAsync
没有这样的保证。如果顺序很重要,您必须添加代码以确保消息按顺序发出,或者尝试通过另一个步骤重新排序。