我们应该在 System.Threading.Channels.WaitToReadAsync 循环中使用 ValueTask 吗?
Should we use ValueTask within System.Threading.Channels.WaitToReadAsync loops?
由于我们希望经常读取,并且我们经常在数据已经可以使用时读取,所以应该 SendLoopAsync
return ValueTask
而不是 Task
,这样我们就可以让它免分配?
// Caller
_ = Task.Factory.StartNew(_ => SendLoopAsync(cancellationToken), TaskCreationOptions.LongRunning, cancellationToken);
// Method
private async ValueTask SendLoopAsync(CancellationToken cancellationToken)
{
while (await _outputChannel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (_outputChannel.Reader.TryRead(out var message))
{
using (await _mutex.LockAsync(cancellationToken).ConfigureAwait(false))
{
await _clientWebSocket.SendAsync(message.Data.AsMemory(), message.MessageType, true, cancellationToken).ConfigureAwait(false);
}
}
}
}
不,SendLoopAsync
处没有值返回 ValueTask
而不是 Task
。此方法仅在您的代码中调用一次。避免对微小对象进行单次分配的影响几乎为零。您应该考虑将 ValueTask
用于在循环中重复调用的异步方法,尤其是在热路径中。问题中提供的示例不是这种情况。
附带说明一下,使用 Task.Factory.StartNew
+TaskCreationOptions.LongRunning
组合调用异步方法毫无意义。将要创建的新 Thread
的生命周期非常短。当代码到达异步方法内不完整等待的第一个 await
时,它将立即终止。此外,您将返回一个嵌套的 Task<Task>
,这很难处理。最好使用 Task.Run
。您可以阅读 here 原因。
另请注意,Nito.AsyncEx.AsyncLock
class 并未针对 memory-efficiency 进行优化。每次获取锁时都会发生大量分配。如果你想要一个可以异步获取的 low-allocation 同步原语,你目前最好的选择可能是使用一个 Channel<object>
实例,用一个 null
值初始化:检索要输入的值,将其存储回释放。
使用通道的惯用方式不需要锁、信号量或 Task.Factory.StartNew
。使用通道的典型方法是使用一个仅接受 ChannelReader 作为输入的方法。如果该方法想要使用 Channel 作为输出,它应该自己创建它并且只有 return 一个 ChannelReader
可以传递给其他方法。通过拥有通道,该方法知道何时可以关闭它。
在问题的情况下,代码很简单。一个简单的 await foreach
就足够了:
private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
CancellationToken cancellationToken)
{
await foreach (var msg in reader.ReadAllAsync(cancellationToken))
{
await _clientWebSocket.SendAsync(message.Data.AsMemory(),
message.MessageType,
true, cancellationToken);
}
}
此方法不需要外部 Task.Run
或 Task.Factory.New
即可工作。要运行它,只需调用它并将其任务存储在某处,但不会丢弃:
public MyWorker(ChannelReader<Message> reader,CancellationToken token)
{
.....
_loopTask=SendLoopAsync(reader,token);
}
这样一来,一旦输入通道完成,代码就可以等待 _loopTask
来完成对任何未决消息的处理。
任何阻塞代码都应该运行在里面Task.Run()
,例如
private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
CancellationToken cancellationToken)
{
await foreach (var msg in reader.ReadAllAsync(cancellationToken))
{
var new_msg=await Task.Run(()=>SomeHeavyWork(msg),cancellationToken);
await _clientWebSocket.SendAsync(message.Data.AsMemory(),
message.MessageType,
true, cancellationToken);
}
}
并发工人
此方法也可用于启动 多个 并发 worker :
var tasks=Enumerable.Range(0,dop).Select(_=>SendLoopAsync(reader,token));
_loopTask=Task.WhenAll(tasks);
...
await _loopTask;
在.NET 6中,Parallel.ForEachAsync
可以用更少的代码处理多条消息:
private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
CancellationToken cancellationToken)
{
var options=new ParallelOptions {
CancellationToke=cancellationToken,
MaxDegreeOfParallellism=4
};
var input=reader.ReadAllAsync(cancellationToken);
await Parallel.ForEachAsync(input,options,async (msg,token)=>{
var new_msg=await Task.Run(()=>SomeHeavyWork(msg),token);
await _clientWebSocket.SendAsync(message.Data.AsMemory(),
message.MessageType,
true, token);
});
}
地道频道制作人
不使用存储在字段中的 class-level 通道,而是在生产者方法中创建通道,并且仅 return 它的 reader。这样,生产者方法就可以控制通道的生命周期,并可以在完成后将其关闭。这就是 Channel
只能通过其 Reader 和 Writer 类.
访问的原因之一
一个方法可以消耗一个 ChannelReader 和 return 另一个。这允许创建可以链接在一起进入管道的方法。
一个简单的生产者可以是这样的:
ChannelReader<Message> Producer(CancellationToke token)
{
var channel=Channel.CreateUnbounded<Message>();
var writer=channel.Writer;
_ = Task.Run(()=>{
while(!token.IsCancellationRequested)
{
var msg=SomeHeavyJob();
await writer.SendAsync(msg);
},token)
.ContinueWith(t=>writer.TryComplete(t));
return channel.Reader;
}
当发出取消信号时,worker 退出或抛出异常,主任务存在并且 ContinueWith
调用 TryComplete
编写器并处理可能已抛出的任何异常。这是一个简单的 non-blocking 操作,因此它 运行 在哪个线程上并不重要。
转换方法如下所示:
ChannelReader<Msg2> Transform(ChannelReader<Msg1> input,CancellationToke token)
{
var channel=Channel.CreateUnbounded<Msg2>();
var writer=channel.Writer;
_ = Task.Run(()=>{
await foreach(var msg1 in input.ReadAllAsync(token))
{
var msg2=SomeHeavyJob(msg1);
await writer.SendAsync(msg2);
},token)
.ContinueWith(t=>writer.TryComplete(t));
return channel.Reader;
}
将这些方法转换为静态扩展方法将允许一个接一个地链接它们:
var task=Producer()
.Transformer()
.Consumer();
我没有设置 SingleWriter
因为这似乎还没有做任何事情。在 Github 上的 .NET 运行time repo 中搜索这个没有显示除测试代码之外的任何结果。
由于我们希望经常读取,并且我们经常在数据已经可以使用时读取,所以应该 SendLoopAsync
return ValueTask
而不是 Task
,这样我们就可以让它免分配?
// Caller
_ = Task.Factory.StartNew(_ => SendLoopAsync(cancellationToken), TaskCreationOptions.LongRunning, cancellationToken);
// Method
private async ValueTask SendLoopAsync(CancellationToken cancellationToken)
{
while (await _outputChannel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (_outputChannel.Reader.TryRead(out var message))
{
using (await _mutex.LockAsync(cancellationToken).ConfigureAwait(false))
{
await _clientWebSocket.SendAsync(message.Data.AsMemory(), message.MessageType, true, cancellationToken).ConfigureAwait(false);
}
}
}
}
不,SendLoopAsync
处没有值返回 ValueTask
而不是 Task
。此方法仅在您的代码中调用一次。避免对微小对象进行单次分配的影响几乎为零。您应该考虑将 ValueTask
用于在循环中重复调用的异步方法,尤其是在热路径中。问题中提供的示例不是这种情况。
附带说明一下,使用 Task.Factory.StartNew
+TaskCreationOptions.LongRunning
组合调用异步方法毫无意义。将要创建的新 Thread
的生命周期非常短。当代码到达异步方法内不完整等待的第一个 await
时,它将立即终止。此外,您将返回一个嵌套的 Task<Task>
,这很难处理。最好使用 Task.Run
。您可以阅读 here 原因。
另请注意,Nito.AsyncEx.AsyncLock
class 并未针对 memory-efficiency 进行优化。每次获取锁时都会发生大量分配。如果你想要一个可以异步获取的 low-allocation 同步原语,你目前最好的选择可能是使用一个 Channel<object>
实例,用一个 null
值初始化:检索要输入的值,将其存储回释放。
使用通道的惯用方式不需要锁、信号量或 Task.Factory.StartNew
。使用通道的典型方法是使用一个仅接受 ChannelReader 作为输入的方法。如果该方法想要使用 Channel 作为输出,它应该自己创建它并且只有 return 一个 ChannelReader
可以传递给其他方法。通过拥有通道,该方法知道何时可以关闭它。
在问题的情况下,代码很简单。一个简单的 await foreach
就足够了:
private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
CancellationToken cancellationToken)
{
await foreach (var msg in reader.ReadAllAsync(cancellationToken))
{
await _clientWebSocket.SendAsync(message.Data.AsMemory(),
message.MessageType,
true, cancellationToken);
}
}
此方法不需要外部 Task.Run
或 Task.Factory.New
即可工作。要运行它,只需调用它并将其任务存储在某处,但不会丢弃:
public MyWorker(ChannelReader<Message> reader,CancellationToken token)
{
.....
_loopTask=SendLoopAsync(reader,token);
}
这样一来,一旦输入通道完成,代码就可以等待 _loopTask
来完成对任何未决消息的处理。
任何阻塞代码都应该运行在里面Task.Run()
,例如
private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
CancellationToken cancellationToken)
{
await foreach (var msg in reader.ReadAllAsync(cancellationToken))
{
var new_msg=await Task.Run(()=>SomeHeavyWork(msg),cancellationToken);
await _clientWebSocket.SendAsync(message.Data.AsMemory(),
message.MessageType,
true, cancellationToken);
}
}
并发工人
此方法也可用于启动 多个 并发 worker :
var tasks=Enumerable.Range(0,dop).Select(_=>SendLoopAsync(reader,token));
_loopTask=Task.WhenAll(tasks);
...
await _loopTask;
在.NET 6中,Parallel.ForEachAsync
可以用更少的代码处理多条消息:
private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
CancellationToken cancellationToken)
{
var options=new ParallelOptions {
CancellationToke=cancellationToken,
MaxDegreeOfParallellism=4
};
var input=reader.ReadAllAsync(cancellationToken);
await Parallel.ForEachAsync(input,options,async (msg,token)=>{
var new_msg=await Task.Run(()=>SomeHeavyWork(msg),token);
await _clientWebSocket.SendAsync(message.Data.AsMemory(),
message.MessageType,
true, token);
});
}
地道频道制作人
不使用存储在字段中的 class-level 通道,而是在生产者方法中创建通道,并且仅 return 它的 reader。这样,生产者方法就可以控制通道的生命周期,并可以在完成后将其关闭。这就是 Channel
只能通过其 Reader 和 Writer 类.
一个方法可以消耗一个 ChannelReader 和 return 另一个。这允许创建可以链接在一起进入管道的方法。
一个简单的生产者可以是这样的:
ChannelReader<Message> Producer(CancellationToke token)
{
var channel=Channel.CreateUnbounded<Message>();
var writer=channel.Writer;
_ = Task.Run(()=>{
while(!token.IsCancellationRequested)
{
var msg=SomeHeavyJob();
await writer.SendAsync(msg);
},token)
.ContinueWith(t=>writer.TryComplete(t));
return channel.Reader;
}
当发出取消信号时,worker 退出或抛出异常,主任务存在并且 ContinueWith
调用 TryComplete
编写器并处理可能已抛出的任何异常。这是一个简单的 non-blocking 操作,因此它 运行 在哪个线程上并不重要。
转换方法如下所示:
ChannelReader<Msg2> Transform(ChannelReader<Msg1> input,CancellationToke token)
{
var channel=Channel.CreateUnbounded<Msg2>();
var writer=channel.Writer;
_ = Task.Run(()=>{
await foreach(var msg1 in input.ReadAllAsync(token))
{
var msg2=SomeHeavyJob(msg1);
await writer.SendAsync(msg2);
},token)
.ContinueWith(t=>writer.TryComplete(t));
return channel.Reader;
}
将这些方法转换为静态扩展方法将允许一个接一个地链接它们:
var task=Producer()
.Transformer()
.Consumer();
我没有设置 SingleWriter
因为这似乎还没有做任何事情。在 Github 上的 .NET 运行time repo 中搜索这个没有显示除测试代码之外的任何结果。