引导多个生产者和消费者

Channel multiple producers and consumers

我有以下代码:

var channel = Channel.CreateUnbounded<string>();

var consumers = Enumerable
    .Range(1, 5)   
    .Select(consumerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var item))
                {
                    Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
                }
            }
        }));

var producers = Enumerable
    .Range(1, 5)    
    .Select(producerNumber =>
        Task.Run(async () =>
        {
            var rnd = new Random();
            for (var i = 0; i < 10; i++)
            {
                var t = $"Message {i}";
                Console.WriteLine($"Producing {t} on producer {producerNumber}");

                await channel.Writer.WriteAsync(t);
                await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
            }
        }));

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete());

await Task.WhenAll(consumers);

它应该可以正常工作,但是我希望它在生产的同时消耗。不过

await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete());

阻止消费者从 运行ning 直到它完成,我想不出让他们都到 运行?

的方法

consumersproducers 变量的类型为 IEnumerable<Task>。这是一个 deferred 可枚举,需要具体化才能创建任务。您可以通过在 LINQ 查询上链接 ToArray 运算符来实现可枚举。这样一来,两个变量的类型就会变成Task[],也就是说你的tasks被实例化起来了,运行.

作为旁注,ContinueWith 方法需要显式传递 TaskScheduler.Default 作为参数,否则你将受制于 TaskScheduler.Current 可能是什么(它可能是例如 UI TaskScheduler)。这是 ContinueWith:

的正确用法
await Task.WhenAll(producers)
    .ContinueWith(_ => channel.Writer.Complete(), TaskScheduler.Default);
  1. 代码分析器 CA2008:Do not create tasks without passing a TaskScheduler
  2. "[...] 这就是为什么在我编写的生产库代码中,我总是明确指定我要使用的调度程序。" (Stephen Toub)

另一个问题是 producers 抛出的任何异常都将被吞噬,因为没有等待任务。只有继续等待,这不太可能失败。要解决此问题,您可以放弃等待 producers 然后完成频道的 primitive ContinueWith, and instead use async-await composition (an async local function。在这种情况下,甚至没有必要。你可以简单地这样做:

try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }

频道会在 Task.WhenAll(producers) 任务的任何结果后 Complete,因此 consumers 不会卡住。

第三个问题是某些producers 的失败将导致当前方法在等待consumers 之前立即终止。然后,这些任务将成为即发即弃的任务。我将留给您找出如何确保 所有 任务在所有情况下都可以等待,然后再成功或出错退出该方法。

代码有几个问题,包括忘记枚举 producersconsumers 可枚举。 IEnumerable 是惰性评估的,因此在您使用 foreachToList 等实际枚举它之前,不会生成任何内容。

如果使用得当,ContinueWith也没有什么问题。它绝对比使用异常作为控制流更好更便宜。

通过使用一些常见的信道编码模式,代码可以改进很多

  1. 生产者拥有并封装频道
  2. 制作者只公开Reader(s)

另外,ContinueWith 是一个 优秀的 选择来表示 ChannelWriter 的完成,因为我们 根本不关心 哪个线程会这样做。如果有的话,我们更愿意使用“工作”线程之一来避免线程切换。

假设生产者函数是:

async Task Produce(ChannelWriter<string> writer, int producerNumber)
{
    return Task.Run(async () =>
    {
        var rnd = new Random();
        for (var i = 0; i < 10; i++)
        {
            var t = $"Message {i}";
            Console.WriteLine($"Producing {t} on producer {producerNumber}");

            await channel.Writer.WriteAsync(t);
            await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
        }
    }
}

制作人

生产者可以是:

ChannelReader<string> ProduceData(int dop)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;

    var tasks=Enumerable.Range(0,dop)
                 .Select(producerNumber => Produce(producerNumber))
                 .ToList();
    _ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
                       .
    
    return channel.Reader;
}

完成和错误传播

注意行:

_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));

这表示一旦生产者完成,编写者本身就应该完成,可能会出现任何异常。 continuation 在哪个线程上运行并不重要,因为它除了调用 TryComplete.

之外什么都不做

更重要的是,t=>writer.TryComplete(t.Exception) 将工作异常传播给下游消费者。否则消费者永远不会知道出了什么问题。如果您有一个数据库使用者,您会希望它在源中止时避免完成任何更改。

消费者

消费者方法可以是:

async Task Consume(ChannelReader<string> reader,int dop,CancellationToken token=default)
{
    var tasks= Enumerable
        .Range(1, dop)   
        .Select(consumerNumber =>
            Task.Run(async () =>
            {
                await foreach(var item in reader.ReadAllAsync(token))
                {
                    Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
                }
            }));
    await Task.WhenAll(tasks);
}

在这种情况下,await Task.WhenAll(tasks); 枚举工作任务,从而启动它们。

生成所有生成的消息不需要其他任何东西。当所有生产者都完成时,Channel.Reader 就完成了。发生这种情况时,ReadAllAsync 将继续向消费者提供所有剩余消息并退出。

作文

结合这两种方法非常简单:

var reader=Produce(10);
await Consume(reader);

一般模式

这是使用通道的流水线阶段的一般模式 - 从通道读取输入Reader,将其写入内部通道并且 return 仅拥有通道的 Reader。通过这种方式,stage 拥有通道,这使得完成和错误处理 lot 更容易:

static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,int dop=1,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<TOut>();
    var writer=channel.Writer;

    var tasks=Enumerable.Range(0,dop)
              .Select(async i=>Task.Run(async ()=>
              {
                  await(var item in reader.ReadAllAsync(token))
                  {
                      try
                      {
                          ...
                          await writer.WriteAsync(msg);
                      }
                      catch(Exception exc)
                      {
                          //Handle the exception and keep processing messages
                      }
                  }
              },token));
    _ =Task.WhenAll(tasks)
           .ContinueWith(t=>writer.TryComplete(t.Exception));
    return channel.Reader;
}

这允许将多个“阶段”链接在一起以形成管道:

var finalReader=Producer(...)
                .Crunch1()
                .Crunch2(10)
                .Crunch3();
await foreach(var result in finalReader.ReadAllAsync())
{
...
}

生产者和消费者方法可以用相同的方式编写,例如允许创建数据导入管道:

var importTask = ReadFiles<string>(somePath)
                  .ParseCsv<string,Record[]>(10)
                  .ImportToDb<Record>(connectionString);

await importTask;

ReadFiles

static ChannelReader<string> ReadFiles(string folder)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;

    var task=Task.Run(async ()=>{
        foreach(var path in Directory.EnumerateFiles(folder,"*.csv"))
        {
            await writer.WriteAsync(path);
        }
    });
    task.ContinueWith(t=>writer.TryComplete(t.Exception));
    return channel.Reader;
}

.NET 6 更新Parallel.ForEachAsync

现在生产中支持 .NET 6,可以使用 Parallel.ForEachAsync 将并发消费者简化为:

static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,
                           int dop=1,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<TOut>();
    var writer=channel.Writer;

    var dop=new ParallelOptions { 
                MaxDegreeOfParallelism = dop,
                CancellationToken = token
    };
    var task=Parallel.ForEachAsync(
                 reader.ReadAllAsync(token),
                 dop,
                 async item =>{
                      try
                      {
                          ...
                          await writer.WriteAsync(msg);
                      }
                      catch(Exception exc)
                      {
                          //Handle the exception and keep processing messages
                      }
                  });
    task.ContinueWith(t=>writer.TryComplete(t.Exception));
    return channel.Reader;
}