引导多个生产者和消费者
Channel multiple producers and consumers
我有以下代码:
var channel = Channel.CreateUnbounded<string>();
var consumers = Enumerable
.Range(1, 5)
.Select(consumerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var item))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}
}));
var producers = Enumerable
.Range(1, 5)
.Select(producerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");
await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
}));
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());
await Task.WhenAll(consumers);
它应该可以正常工作,但是我希望它在生产的同时消耗。不过
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());
阻止消费者从 运行ning 直到它完成,我想不出让他们都到 运行?
的方法
consumers
和 producers
变量的类型为 IEnumerable<Task>
。这是一个 deferred 可枚举,需要具体化才能创建任务。您可以通过在 LINQ 查询上链接 ToArray
运算符来实现可枚举。这样一来,两个变量的类型就会变成Task[]
,也就是说你的tasks被实例化起来了,运行.
作为旁注,ContinueWith
方法需要显式传递 TaskScheduler.Default
作为参数,否则你将受制于 TaskScheduler.Current
可能是什么(它可能是例如 UI TaskScheduler
)。这是 ContinueWith
:
的正确用法
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete(), TaskScheduler.Default);
- 代码分析器 CA2008:Do not create tasks without passing a
TaskScheduler
- "[...] 这就是为什么在我编写的生产库代码中,我总是明确指定我要使用的调度程序。" (Stephen Toub)
另一个问题是 producers
抛出的任何异常都将被吞噬,因为没有等待任务。只有继续等待,这不太可能失败。要解决此问题,您可以放弃等待 producers
然后完成频道的 primitive ContinueWith
, and instead use async-await composition (an async local function。在这种情况下,甚至没有必要。你可以简单地这样做:
try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }
频道会在 Task.WhenAll(producers)
任务的任何结果后 Complete
,因此 consumers
不会卡住。
第三个问题是某些producers
的失败将导致当前方法在等待consumers
之前立即终止。然后,这些任务将成为即发即弃的任务。我将留给您找出如何确保 所有 任务在所有情况下都可以等待,然后再成功或出错退出该方法。
代码有几个问题,包括忘记枚举 producers
和 consumers
可枚举。 IEnumerable
是惰性评估的,因此在您使用 foreach
或 ToList
等实际枚举它之前,不会生成任何内容。
如果使用得当,ContinueWith
也没有什么问题。它绝对比使用异常作为控制流更好更便宜。
通过使用一些常见的信道编码模式,代码可以改进很多。
- 生产者拥有并封装频道
- 制作者只公开Reader(s)
另外,ContinueWith
是一个 优秀的 选择来表示 ChannelWriter 的完成,因为我们 根本不关心 哪个线程会这样做。如果有的话,我们更愿意使用“工作”线程之一来避免线程切换。
假设生产者函数是:
async Task Produce(ChannelWriter<string> writer, int producerNumber)
{
return Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");
await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
}
}
制作人
生产者可以是:
ChannelReader<string> ProduceData(int dop)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
var tasks=Enumerable.Range(0,dop)
.Select(producerNumber => Produce(producerNumber))
.ToList();
_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
.
return channel.Reader;
}
完成和错误传播
注意行:
_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
这表示一旦生产者完成,编写者本身就应该完成,可能会出现任何异常。 continuation 在哪个线程上运行并不重要,因为它除了调用 TryComplete
.
之外什么都不做
更重要的是,t=>writer.TryComplete(t.Exception)
将工作异常传播给下游消费者。否则消费者永远不会知道出了什么问题。如果您有一个数据库使用者,您会希望它在源中止时避免完成任何更改。
消费者
消费者方法可以是:
async Task Consume(ChannelReader<string> reader,int dop,CancellationToken token=default)
{
var tasks= Enumerable
.Range(1, dop)
.Select(consumerNumber =>
Task.Run(async () =>
{
await foreach(var item in reader.ReadAllAsync(token))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}));
await Task.WhenAll(tasks);
}
在这种情况下,await Task.WhenAll(tasks);
枚举工作任务,从而启动它们。
生成所有生成的消息不需要其他任何东西。当所有生产者都完成时,Channel.Reader
就完成了。发生这种情况时,ReadAllAsync
将继续向消费者提供所有剩余消息并退出。
作文
结合这两种方法非常简单:
var reader=Produce(10);
await Consume(reader);
一般模式
这是使用通道的流水线阶段的一般模式 - 从通道读取输入Reader,将其写入内部通道并且 return 仅拥有通道的 Reader。通过这种方式,stage 拥有通道,这使得完成和错误处理 lot 更容易:
static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,int dop=1,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<TOut>();
var writer=channel.Writer;
var tasks=Enumerable.Range(0,dop)
.Select(async i=>Task.Run(async ()=>
{
await(var item in reader.ReadAllAsync(token))
{
try
{
...
await writer.WriteAsync(msg);
}
catch(Exception exc)
{
//Handle the exception and keep processing messages
}
}
},token));
_ =Task.WhenAll(tasks)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
这允许将多个“阶段”链接在一起以形成管道:
var finalReader=Producer(...)
.Crunch1()
.Crunch2(10)
.Crunch3();
await foreach(var result in finalReader.ReadAllAsync())
{
...
}
生产者和消费者方法可以用相同的方式编写,例如允许创建数据导入管道:
var importTask = ReadFiles<string>(somePath)
.ParseCsv<string,Record[]>(10)
.ImportToDb<Record>(connectionString);
await importTask;
与ReadFiles
static ChannelReader<string> ReadFiles(string folder)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
var task=Task.Run(async ()=>{
foreach(var path in Directory.EnumerateFiles(folder,"*.csv"))
{
await writer.WriteAsync(path);
}
});
task.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
.NET 6 更新Parallel.ForEachAsync
现在生产中支持 .NET 6,可以使用 Parallel.ForEachAsync
将并发消费者简化为:
static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,
int dop=1,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<TOut>();
var writer=channel.Writer;
var dop=new ParallelOptions {
MaxDegreeOfParallelism = dop,
CancellationToken = token
};
var task=Parallel.ForEachAsync(
reader.ReadAllAsync(token),
dop,
async item =>{
try
{
...
await writer.WriteAsync(msg);
}
catch(Exception exc)
{
//Handle the exception and keep processing messages
}
});
task.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
我有以下代码:
var channel = Channel.CreateUnbounded<string>();
var consumers = Enumerable
.Range(1, 5)
.Select(consumerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var item))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}
}));
var producers = Enumerable
.Range(1, 5)
.Select(producerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");
await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
}));
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());
await Task.WhenAll(consumers);
它应该可以正常工作,但是我希望它在生产的同时消耗。不过
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());
阻止消费者从 运行ning 直到它完成,我想不出让他们都到 运行?
的方法consumers
和 producers
变量的类型为 IEnumerable<Task>
。这是一个 deferred 可枚举,需要具体化才能创建任务。您可以通过在 LINQ 查询上链接 ToArray
运算符来实现可枚举。这样一来,两个变量的类型就会变成Task[]
,也就是说你的tasks被实例化起来了,运行.
作为旁注,ContinueWith
方法需要显式传递 TaskScheduler.Default
作为参数,否则你将受制于 TaskScheduler.Current
可能是什么(它可能是例如 UI TaskScheduler
)。这是 ContinueWith
:
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete(), TaskScheduler.Default);
- 代码分析器 CA2008:Do not create tasks without passing a
TaskScheduler
- "[...] 这就是为什么在我编写的生产库代码中,我总是明确指定我要使用的调度程序。" (Stephen Toub)
另一个问题是 producers
抛出的任何异常都将被吞噬,因为没有等待任务。只有继续等待,这不太可能失败。要解决此问题,您可以放弃等待 producers
然后完成频道的 primitive ContinueWith
, and instead use async-await composition (an async local function。在这种情况下,甚至没有必要。你可以简单地这样做:
try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }
频道会在 Task.WhenAll(producers)
任务的任何结果后 Complete
,因此 consumers
不会卡住。
第三个问题是某些producers
的失败将导致当前方法在等待consumers
之前立即终止。然后,这些任务将成为即发即弃的任务。我将留给您找出如何确保 所有 任务在所有情况下都可以等待,然后再成功或出错退出该方法。
代码有几个问题,包括忘记枚举 producers
和 consumers
可枚举。 IEnumerable
是惰性评估的,因此在您使用 foreach
或 ToList
等实际枚举它之前,不会生成任何内容。
如果使用得当,ContinueWith
也没有什么问题。它绝对比使用异常作为控制流更好更便宜。
通过使用一些常见的信道编码模式,代码可以改进很多。
- 生产者拥有并封装频道
- 制作者只公开Reader(s)
另外,ContinueWith
是一个 优秀的 选择来表示 ChannelWriter 的完成,因为我们 根本不关心 哪个线程会这样做。如果有的话,我们更愿意使用“工作”线程之一来避免线程切换。
假设生产者函数是:
async Task Produce(ChannelWriter<string> writer, int producerNumber)
{
return Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");
await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
}
}
制作人
生产者可以是:
ChannelReader<string> ProduceData(int dop)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
var tasks=Enumerable.Range(0,dop)
.Select(producerNumber => Produce(producerNumber))
.ToList();
_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
.
return channel.Reader;
}
完成和错误传播
注意行:
_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
这表示一旦生产者完成,编写者本身就应该完成,可能会出现任何异常。 continuation 在哪个线程上运行并不重要,因为它除了调用 TryComplete
.
更重要的是,t=>writer.TryComplete(t.Exception)
将工作异常传播给下游消费者。否则消费者永远不会知道出了什么问题。如果您有一个数据库使用者,您会希望它在源中止时避免完成任何更改。
消费者
消费者方法可以是:
async Task Consume(ChannelReader<string> reader,int dop,CancellationToken token=default)
{
var tasks= Enumerable
.Range(1, dop)
.Select(consumerNumber =>
Task.Run(async () =>
{
await foreach(var item in reader.ReadAllAsync(token))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}));
await Task.WhenAll(tasks);
}
在这种情况下,await Task.WhenAll(tasks);
枚举工作任务,从而启动它们。
生成所有生成的消息不需要其他任何东西。当所有生产者都完成时,Channel.Reader
就完成了。发生这种情况时,ReadAllAsync
将继续向消费者提供所有剩余消息并退出。
作文
结合这两种方法非常简单:
var reader=Produce(10);
await Consume(reader);
一般模式
这是使用通道的流水线阶段的一般模式 - 从通道读取输入Reader,将其写入内部通道并且 return 仅拥有通道的 Reader。通过这种方式,stage 拥有通道,这使得完成和错误处理 lot 更容易:
static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,int dop=1,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<TOut>();
var writer=channel.Writer;
var tasks=Enumerable.Range(0,dop)
.Select(async i=>Task.Run(async ()=>
{
await(var item in reader.ReadAllAsync(token))
{
try
{
...
await writer.WriteAsync(msg);
}
catch(Exception exc)
{
//Handle the exception and keep processing messages
}
}
},token));
_ =Task.WhenAll(tasks)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
这允许将多个“阶段”链接在一起以形成管道:
var finalReader=Producer(...)
.Crunch1()
.Crunch2(10)
.Crunch3();
await foreach(var result in finalReader.ReadAllAsync())
{
...
}
生产者和消费者方法可以用相同的方式编写,例如允许创建数据导入管道:
var importTask = ReadFiles<string>(somePath)
.ParseCsv<string,Record[]>(10)
.ImportToDb<Record>(connectionString);
await importTask;
与ReadFiles
static ChannelReader<string> ReadFiles(string folder)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;
var task=Task.Run(async ()=>{
foreach(var path in Directory.EnumerateFiles(folder,"*.csv"))
{
await writer.WriteAsync(path);
}
});
task.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}
.NET 6 更新Parallel.ForEachAsync
现在生产中支持 .NET 6,可以使用 Parallel.ForEachAsync
将并发消费者简化为:
static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,
int dop=1,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<TOut>();
var writer=channel.Writer;
var dop=new ParallelOptions {
MaxDegreeOfParallelism = dop,
CancellationToken = token
};
var task=Parallel.ForEachAsync(
reader.ReadAllAsync(token),
dop,
async item =>{
try
{
...
await writer.WriteAsync(msg);
}
catch(Exception exc)
{
//Handle the exception and keep processing messages
}
});
task.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}