我们应该在 System.Threading.Channels.WaitToReadAsync 循环中使用 ValueTask 吗?

Should we use ValueTask within System.Threading.Channels.WaitToReadAsync loops?

由于我们希望经常读取,并且我们经常在数据已经可以使用时读取,所以应该 SendLoopAsync return ValueTask 而不是 Task,这样我们就可以让它免分配?

// Caller
_ = Task.Factory.StartNew(_ => SendLoopAsync(cancellationToken), TaskCreationOptions.LongRunning, cancellationToken);

// Method
private async ValueTask SendLoopAsync(CancellationToken cancellationToken)
{
    while (await _outputChannel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
    {
        while (_outputChannel.Reader.TryRead(out var message))
        {
            using (await _mutex.LockAsync(cancellationToken).ConfigureAwait(false))
            {
                await _clientWebSocket.SendAsync(message.Data.AsMemory(), message.MessageType, true, cancellationToken).ConfigureAwait(false);
            }
        }
    }
}

不,SendLoopAsync 处没有值返回 ValueTask 而不是 Task。此方法仅在您的代码中调用一次。避免对微小对象进行单次分配的影响几乎为零。您应该考虑将 ValueTask 用于在循环中重复调用的异步方法,尤其是在热路径中。问题中提供的示例不是这种情况。

附带说明一下,使用 Task.Factory.StartNew+TaskCreationOptions.LongRunning 组合调用异步方法毫无意义。将要创建的新 Thread 的生命周期非常短。当代码到达异步方法内不完整等待的第一个 await 时,它将立即终止。此外,您将返回一个嵌套的 Task<Task>,这很难处理。最好使用 Task.Run。您可以阅读 here 原因。

另请注意,Nito.AsyncEx.AsyncLock class 并未针对 memory-efficiency 进行优化。每次获取锁时都会发生大量分配。如果你想要一个可以异步获取的 low-allocation 同步原语,你目前最好的选择可能是使用一个 Channel<object> 实例,用一个 null 值初始化:检索要输入的值,将其存储回释放。

使用通道的惯用方式不需要锁、信号量或 Task.Factory.StartNew。使用通道的典型方法是使用一个仅接受 ChannelReader 作为输入的方法。如果该方法想要使用 Channel 作为输出,它应该自己创建它并且只有 return 一个 ChannelReader 可以传递给其他方法。通过拥有通道,该方法知道何时可以关闭它。

在问题的情况下,代码很简单。一个简单的 await foreach 就足够了:

private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
                                      CancellationToken cancellationToken)
{
    await foreach (var msg in reader.ReadAllAsync(cancellationToken))
    {
        await _clientWebSocket.SendAsync(message.Data.AsMemory(), 
                      message.MessageType, 
                      true, cancellationToken);
    }
}

此方法不需要外部 Task.RunTask.Factory.New 即可工作。要运行它,只需调用它并将其任务存储在某处,但不会丢弃:

public MyWorker(ChannelReader<Message> reader,CancellationToken token)
{
   .....
   _loopTask=SendLoopAsync(reader,token);
}

这样一来,一旦输入通道完成,代码就可以等待 _loopTask 来完成对任何未决消息的处理。

任何阻塞代码都应该运行里面Task.Run(),例如

private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
                                      CancellationToken cancellationToken)
{
    await foreach (var msg in reader.ReadAllAsync(cancellationToken))
    {
        var new_msg=await Task.Run(()=>SomeHeavyWork(msg),cancellationToken);
        await _clientWebSocket.SendAsync(message.Data.AsMemory(), 
                      message.MessageType, 
                      true, cancellationToken);
    }
}

并发工人

此方法也可用于启动 多个 并发 worker :


var tasks=Enumerable.Range(0,dop).Select(_=>SendLoopAsync(reader,token));
_loopTask=Task.WhenAll(tasks);
...
await _loopTask;

在.NET 6中,Parallel.ForEachAsync可以用更少的代码处理多条消息:

private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
                                      CancellationToken cancellationToken)
{
    var options=new ParallelOptions {
        CancellationToke=cancellationToken,
        MaxDegreeOfParallellism=4
    };
    var input=reader.ReadAllAsync(cancellationToken);
    await Parallel.ForEachAsync(input,options,async (msg,token)=>{
        var new_msg=await Task.Run(()=>SomeHeavyWork(msg),token);
        await _clientWebSocket.SendAsync(message.Data.AsMemory(), 
                      message.MessageType, 
                      true, token);
    });
}

地道频道制作人

不使用存储在字段中的 class-level 通道,而是在生产者方法中创建通道,并且仅 return 它的 reader。这样,生产者方法就可以控制通道的生命周期,并可以在完成后将其关闭。这就是 Channel 只能通过其 Reader 和 Writer 类.

访问的原因之一

一个方法可以消耗一个 ChannelReader return 另一个。这允许创建可以链接在一起进入管道的方法。

一个简单的生产者可以是这样的:

ChannelReader<Message> Producer(CancellationToke token)
{
    var channel=Channel.CreateUnbounded<Message>();
    var writer=channel.Writer;
    _ = Task.Run(()=>{
        while(!token.IsCancellationRequested)
        {
            var msg=SomeHeavyJob();
            await writer.SendAsync(msg);
        },token)
    .ContinueWith(t=>writer.TryComplete(t));

    return channel.Reader;
}

当发出取消信号时,worker 退出或抛出异常,主任务存在并且 ContinueWith 调用 TryComplete 编写器并处理可能已抛出的任何异常。这是一个简单的 non-blocking 操作,因此它 运行 在哪个线程上并不重要。

转换方法如下所示:

ChannelReader<Msg2> Transform(ChannelReader<Msg1> input,CancellationToke token)
{
    var channel=Channel.CreateUnbounded<Msg2>();
    var writer=channel.Writer;
    _ = Task.Run(()=>{
        await foreach(var msg1 in input.ReadAllAsync(token))
        {
            var msg2=SomeHeavyJob(msg1);
            await writer.SendAsync(msg2);
        },token)
    .ContinueWith(t=>writer.TryComplete(t));

    return channel.Reader;
}

将这些方法转换为静态扩展方法将允许一个接一个地链接它们:

var task=Producer()
          .Transformer()
          .Consumer();

我没有设置 SingleWriter 因为这似乎还没有做任何事情。在 Github 上的 .NET 运行time repo 中搜索这个没有显示除测试代码之外的任何结果。