使用 System.Threading.Channels 时不执行异步方法

Async method not executing when using System.Threading.Channels

出于某种原因,消费者和生产者任务中的代码似乎从未被执行过。我哪里错了?

using System.Threading.Channels;

namespace TEST.CHANNELS
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            var channel = Channel.CreateUnbounded<int>();
            var cancel = new CancellationToken();
            await Consumer(channel, cancel);
            await Producer(channel, cancel);

            Console.ReadKey();
        }

        private static async Task Producer(Channel<int, int> ch, CancellationToken cancellationToken)
        {
            for (int i = 0; i < 59; i++)
            {
                await Task.Delay(1000, cancellationToken);
                await ch.Writer.WriteAsync(i, cancellationToken);
            }
        }
        
        private static async Task Consumer(Channel<int, int> ch, CancellationToken cancellationToken)
        {
            await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
            {
                Console.WriteLine(item);
            }
        }
    }
}

如果你是新手,我建议阅读Tutorial: Learn to debug C# code using Visual Studio。您应该知道如何设置断点以逐步查看您的代码。运行。

但是现在因为这个涉及 async/Task,它可能看起来很混乱,但是当你进入 Consumer 时,你会看到它停在 await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken)) 行。

原因是消费者正在等待生产者从未放入的东西。原因是您的第一个 await 停止了您的代码,因此第二行永远不会执行。

await Consumer(channel, cancel);
await Producer(channel, cancel);

这应该可以解决问题:

var consumerTask = Consumer(channel, cancel);
var producerTask = Producer(channel, cancel);

await Task.WhenAll(consumerTask, producerTask);

上面的代码说的是,

  1. 运行 Consumer Task,不要等待它,而是在consumerTask.

    中跟踪它
  2. 运行 Producer Task,不要等待,而是在producerTask.

    中跟踪它
  3. 等待 consumerTaskproducerTask 都使用 Task.WhenAll

请注意,Consumer 似乎仍然存在逻辑问题,因为它永远不会退出,因此您的 ReadKey() 可能不会受到攻击(您的应用程序会卡在 WhenAll 行)。如果你打算修复它,如果它是一个错误,我认为这对你来说更容易“实践”。

您的代码试图在生成任何消息之前使用通道中的所有消息。虽然您可以存储 producer/consumer 任务而不是等待它们,但最好使用频道特定的习语和模式。

与其将频道用作某种容器,不如将读者公开并共享给消费者创建和拥有的频道。这就是 Go 中 Channels 的使用方式。

这就是为什么您也只能使用 ChannelReader 和 ChannelWriter 的原因:

  • ChannelReader 是 Go 中的 ch ->,是从通道读取的唯一方法
  • ChannelWriter是Go中的一个ch <-,唯一的写法

使用自有频道

如果您需要异步处理数据,请在 方法中的 任务中执行此操作。这使得 lot 更容易控制通道并知道处理何时完成或取消。它还允许您非常容易地从通道构建管道。

在您的情况下,制作人可能是:

public ChannelReader<int> Producer(CancellationToken cancellationToken)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;
    _ = Task.Run(()=>{
        for (int i = 0; i < 59; i++)
        {
            await Task.Delay(1000, cancellationToken);
            await writer.WriteAsync(i, cancellationToken);
        }
    },cancellationToken)
   .ContinueWith(t=>writer.TryComplete(t.Exception));

   return channel;
}

消费者,如果是懒惰的,可以是:

static async Task ConsumeNumbers(this ChannelReader<int> reader, CancellationToken cancellationToken)
    {
        await foreach (var item in reader.ReadAllAsync(cancellationToken))
        {
            Console.WriteLine(item);
        }
    }

将此作为扩展方法两者都可以结合使用:


await Producer(cancel)
     .ConsumeNumbers(cancel);

在更一般的情况下,管道块从一个通道和 returns 一个通道读取:

public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        await foreach (var item in reader.ReadAllAsync(cancellationToken))
        {
            var newItem=Math.Pow(item,pow);
            await writer.WriteAsync(newItem);
        }
    },cancellationToken)
   .ContinueWith(t=>writer.TryComplete(t.Exception));

   return channel;
}

这将允许创建一系列步骤,例如:

await Producer(cancel)
      .RaiseTo(0.3,cancel)
      .RaiseTo(3,cancel)
      .ConsumeNumbers(cancel);

并行处理

也可以在每个块中使用多个任务,以加快处理速度。在 .NET 6 中,可以使用 Parallel.ForEachAsync 轻松完成此操作:

public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;

    _ = Parallel.ForEachAsync(
            reader.ReadAllAsync(cancellationToken),
            cancellationToken,
            async item=>
            {
                var newItem=Math.Pow(item,pow);
                await writer.WriteAsync(newItem);
            })
   .ContinueWith(t=>writer.TryComplete(t.Exception));

   return channel;
}

注意顺序

通道保留项目 读取请求的顺序。这意味着单任务步骤将始终按顺序消耗 生成消息。但是 Parallel.ForEachAsync 没有这样的保证。如果顺序很重要,您必须添加代码以确保消息按顺序发出,或者尝试通过另一个步骤重新排序。