为什么 Bedrock ProtocolReader 会忽略 CancellationToken?

Why Bedrock ProtocolReader ignores CancellationToken?

似乎我的自定义 ProtocolReader 忽略了 CancellationToken,而且 result.Cancelled 总是 false

大部分代码来自:https://github.com/davidfowl/BedrockFramework/blob/master/samples/ServerApplication/MyCustomProtocol.cs

我是不是漏了什么?

这是我的 ProtocolReader:

public class CustomProtocolReader : IMessageReader<SocketObject>
{
    public bool TryParseMessage(in ReadOnlySequence<byte> input,
        ref SequencePosition consumed, 
        ref SequencePosition examined, 
        out SocketObject message)
    {
        var reader = new SequenceReader<byte>(input);

        var payload = input.Slice(reader.Position, input.Length);
        message = new SocketObject { Buffer = payload.ToArray() };

        consumed = payload.End;
        examined = consumed;

        return true;
    }
}

这里是 ConnectionHandler:

while (true)
{
    try
    {
        var cts = new CancellationTokenSource();
        cts.CancelAfter(2000);

        // this should throw if nothing is received after 2 seconds.
        var result = await reader.ReadAsync(protocol, cts.Token); 

        if (result.IsCompleted || result.IsCanceled)  // this never hits
        {
            Console.WriteLine("Broke or cancelled");
            break;
        }
    }
    catch (OperationCanceledException) // this never gets fired
    {
        Console.WriteLine("Cancelled");
        break;
    }
    finally
    {
        reader.Advance();
    }
}

更新 1:

默认 connection.Transport.Input.ReadAsync 按预期工作并尊重 CancellationToken.

更新:哦好吧..我刚刚忽略了this part。所以是的,你应该期待 2 秒后取消,甚至 ValueTask 应该打印 IsCancled = true。 :/ 恐怕,但我不知道。我试图理解 Pipe/PipeAwaitable 的 IValueTaskSource.OnCompleted 实现,但它非常复杂。我现在没有时间。

由于历史原因,我将答案放在这里。

It seems that my Custom ProtocolReader ignores CancellationToken

没有。

首先你要知道CancellationToken不是在主动做某事。使用 CancellationToken.CancelAfter 时,它只是通知其来源 CancellationTokenSource 及其子项(由 CancellationTokenSource.CreateLinkedTokenSource 创建的取消令牌源)将其 IsCancellationRequested 设置为 true 并触发所有处理程序( CancellationToken.Register) 在所有涉及的 CancellationTokenSource 中注册。通过不断查看 IsCancellationRequested 或通过 CancellationToken.Register 注册处理程序(参见下面的示例实现),您可以在响应中收听取消请求。

调用 reader.ReadAsync 时最深的子例程是检查一次您传递给 reader.ReadAsync 的令牌是否已被请求取消。让我详细说明一下:

我假设您的 reader 类型为 ProtocolReader has been created from connectionContext.CreateReader (like in MyCustomProtocol) which internally passes Input of the Transport of ConnectionContext to constructor of ProtocolReader. The implementation of ConnectionContext should be SocketConnection. The property Transport is of type IDuplexPipe and is created in SocketConnection.StartAsync which creates a DuplexPipePair and saves its property Transport to SocketConnection.Transport (see property Transport of SocketConnection). The type of Transport has Input and Output and are both of type Pipe. This class uses PipeAwaitable internally in implementation of System.IO.Pipelines.Pipe.ReadAsync (see method ReadAsync of type Pipe) by calling BeginOperation of PipeAwaitable。最后,此方法第一次调用 CancellationToken.ThrowIfCancellationRequested() 您最初通过 var result = await reader.ReadAsync(protocol, cts.Token); 传递的令牌。

因此,只有在代码段 CancellationToken.ThrowIfCancellationRequested() 可以通过的那一刻您的取消请求未被取消时,您的取消标记才会被忽略。

also the result.Cancelled is always false.

您的结果不是任务。当您手动创建任务 (Task.Factory.StartNew) 时,您可以传递一个 CancellationToken。如果此令牌在任务创建之前或之后收到取消请求,则 Task.IsCanceled 将变为真,除非已经完成(请参阅 Task.AssignCancellationToken)。

但是您可能已经意识到您获得的不是 Task 而是 ValueTask。这是一个轻量级任务实现,可减少任务创建开销。这不接受任何 CancellationToken。因此,您最初通过的 CancellationToken 不会用于通知 ValueTask 被取消。换一种说法。一旦 ValueTask 被创建,来自 CancellationTokenSource 的取消将不再将 ValueTask.IsCanceled 变为真。只有它的内部持有变量和 Task 或 IValueTaskSource 类型及其 Task.IsCanceled 或 IValueTaskSource.GetStatus 的实现可以改变这一点,但事实并非如此(见下一段)。

ValueTask 接受任何“预先计算”的值,Task 或实现 IValueTaskSource 的“任务”/承诺。对于实现 IValueTaskSource 的值,指示 ValueTask.IsCanceled = true 的唯一方法是 IValueTaskSource.GetStatus returns ValueTaskSourceStatus.Canceled。 因此 Pipe return 中的 ReadAsync 要么是 ValueTask 包装的同步结果,要么仍然是 pending/promised 结果也被 ValueTask 包装,这很重要,这个 pending/promised 结果将是 Pipe 类型的内部变量,已在 public class Pipe 的构造函数中初始化。但是您最初在 Pipe.ReadAsync 上下文中传递的取消令牌是 而不是 与类型 Pipe 的内部变量(实现 IValueTaskSource)交互以实现其实现方法 IValueTaskSource.GetStatus 到 return ValueTaskSourceStatus.Canceled.

因此,您的任务永远不会成为现实的原因是,在任何分支中,都不会创建带有初始取消令牌的 Task,也不会实现与 IValueTaskSource 交互的实现您最初通过的取消令牌。

那么后果是什么?

在您的令牌被请求取消后调用 ReadAsync 时,您没问题 - 您得到了您想要的 OperationCanceledException。之后,您就不会受到预期的取消行为的影响。所以这就是你必须开始自己实施解决方案的地方:

        public Task ReadAsync() =>
            // This is just an example.
            Task.Delay(int.MaxValue);

        public async Task DoTaskNotForever(CancellationToken token = default)
        {
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
            cts.CancelAfter(2);
            var tcs = new TaskCompletionSource<object>();
            using var registration = cts.Token.Register(() => tcs.SetCanceled());

            while (true) {
                try {
                    await Task.WhenAny(ReadAsync(), tcs.Task).ConfigureAwait(false);
                } catch (OperationCanceledException error) { 
                    // Ensure to flush/close/reconnect socket !!! (see below why)
                }
            }
        }

在你使用上面的之前...我只能建议你使用库 https://github.com/StephenCleary/AsyncEx. Especially Nito.AsyncEx.Tasks and the Task extension method WaitAsync

类似于:Tasks.WhenAny(theActualTask, throwOnCancellationRequestedTask) 其中 throwOnCancellationRequestedTask 持续查找取消请求,如果是则抛出:

using using Nito.AsyncEx;

...

while (true)
{
    try
    {
        var cts = new CancellationTokenSource();
        cts.CancelAfter(2000);

        // this should throw if nothing is received after 2 seconds.
        var result = await reader.ReadAsync(protocol, cts.Token).WaitAsync(cts.Token); 

        if (result.IsCompleted || result.IsCanceled)  // this should hit
        {
            Console.WriteLine("Broke or cancelled");
            break;
        }
    }
    catch (OperationCanceledException) // should get fired after 2 seconds unless ReadAsync has not yet finished
    {
        Console.WriteLine("Cancelled");
        break;
    }
    finally
    {
        reader.Advance();
    }
}

"确保flush/close/reconnect套接字!!!"

WaitAsync(来自 Nito.AsyncEx.Tasks)或您自己的实施取消超时后,您应该重新创建套接字,或实施安全程序以确保完成或仍然 运行 来自 ReadAsync 的 ValueTask 被“完成”。在 SerialPort 中,您可以刷新 SerialPort.BaseStream,这样您从 SerialPort.ReadAsync 获得的 Task 就会“完成”。