优雅地关闭管网套接字

Gracefully closing pipe web sockets

我正在尝试创建一个管道网络套接字客户端,它将从交换中提取数据。该代码介于 David Fowler 的 BedrockFramework and @StephenCleary's TCP Chat 之间。我不确定 IDuplexPipe(David 的方式)和普通管道(Stephen 的方式)之间是否有区别,我相信它们是表达同一事物的两种方式。

问题是我如何优雅地 stop/close 一切,即基本上我想创建一个 StopAsync 方法?附带问题,他们不使用 CancellationToken 的原因是什么?

public sealed class PipeSocketClient
{
    private readonly string _baseUrl;

    private readonly Pipe _inputPipe;

    private readonly Pipe _outputPipe;

    private readonly ClientWebSocket _webSocket;

    private volatile bool _aborted;

    public PipeSocketClient(string baseUrl, PipeOptions? pipeOptions = default)
    {
        _baseUrl = baseUrl;

        _webSocket = new ClientWebSocket();

        _inputPipe = new Pipe(pipeOptions ?? PipeOptions.Default);
        _outputPipe = new Pipe();
    }

    public async ValueTask<PipeSocketClient> StartAsync()
    {
        await _webSocket.ConnectAsync(new Uri(_baseUrl), CancellationToken.None).ConfigureAwait(false);

        _ = ExecuteAsync(_webSocket, _inputPipe.Writer, _outputPipe.Reader);

        return this;
    }

    private async Task ExecuteAsync(WebSocket webSocket, PipeWriter pipeWriter, PipeReader pipeReader)
    {
        Exception? sendError = null;
        try
        {
            // Spawn send and receive logic
            var receiveTask = DoReceiveAsync(webSocket, pipeWriter);
            var sendTask = DoSendAsync(webSocket, pipeReader);

            // If the sending task completes then close the receive
            // We don't need to do this in the other direction because the kestrel
            // will trigger the output closing once the input is complete.
            if (await Task.WhenAny(receiveTask, sendTask).ConfigureAwait(false) == sendTask)
            {
                // Tell the reader it's being aborted
                webSocket.Dispose();
            }

            // Now wait for both to complete
            await receiveTask.ConfigureAwait(false);
            sendError = await sendTask.ConfigureAwait(false);

            // Dispose the socket(should noop if already called)
            webSocket.Dispose();
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Unexpected exception: {ex.Message}");
        }
        finally
        {
            // Complete the output after disposing the socket
            await pipeReader.CompleteAsync(sendError).ConfigureAwait(false);
        }
    }

    private async Task DoReceiveAsync(WebSocket webSocket, PipeWriter pipeWriter)
    {
        Exception? error = null;

        try
        {
            await ProcessReceivesAsync(webSocket, pipeWriter).ConfigureAwait(false);
        }
        catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
        {
            error = new ConnectionResetException(ex.Message, ex);
        }
        catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted ||
                                         ex.SocketErrorCode == SocketError.ConnectionAborted ||
                                         ex.SocketErrorCode == SocketError.Interrupted ||
                                         ex.SocketErrorCode == SocketError.InvalidArgument)
        {
            if (!_aborted)
            {
                // Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
                error = new ConnectionAbortedException();
            }
        }
        catch (ObjectDisposedException)
        {
            if (!_aborted)
            {
                error = new ConnectionAbortedException();
            }
        }
        catch (IOException ex)
        {
            error = ex;
        }
        catch (Exception ex)
        {
            error = new IOException(ex.Message, ex);
        }
        finally
        {
            if (_aborted)
            {
                error ??= new ConnectionAbortedException();
            }

            await pipeWriter.CompleteAsync(error).ConfigureAwait(false);
        }
    }

    private async Task ProcessReceivesAsync(WebSocket webSocket, PipeWriter pipeWriter)
    {
        while (true)
        {
            // Ensure we have some reasonable amount of buffer space
            var buffer = pipeWriter.GetMemory();

            var bytesRead = await webSocket.ReceiveAsync(buffer, CancellationToken.None).ConfigureAwait(false);

            if (bytesRead.Count == 0)
            {
                break;
            }

            pipeWriter.Advance(bytesRead.Count);

            var flushTask = pipeWriter.FlushAsync();

            if (!flushTask.IsCompleted)
            {
                await flushTask.ConfigureAwait(false);
            }

            var result = flushTask.Result;
            if (result.IsCompleted)
            {
                // Pipe consumer is shut down, do we stop writing
                break;
            }
        }
    }

    private async Task<Exception?> DoSendAsync(WebSocket webSocket, PipeReader pipeReader)
    {
        Exception? error = null;

        try
        {
            await ProcessSendsAsync(webSocket, pipeReader).ConfigureAwait(false);
        }
        catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
        {
            error = null;
        }
        catch (ObjectDisposedException)
        {
            error = null;
        }
        catch (IOException ex)
        {
            error = ex;
        }
        catch (Exception ex)
        {
            error = new IOException(ex.Message, ex);
        }
        finally
        {
            _aborted = true;
            webSocket.Abort();
        }

        return error;
    }

    private async Task ProcessSendsAsync(WebSocket webSocket, PipeReader pipeReader)
    {
        while (true)
        {
            // Wait for data to write from the pipe producer
            var result = await pipeReader.ReadAsync().ConfigureAwait(false);
            var buffer = result.Buffer;

            if (result.IsCanceled)
            {
                break;
            }

            var end = buffer.End;
            var isCompleted = result.IsCompleted;
            if (!buffer.IsEmpty)
            {
                await webSocket.SendAsync(buffer, WebSocketMessageType.Text).ConfigureAwait(false);
            }

            pipeReader.AdvanceTo(end);

            if (isCompleted)
            {
                break;
            }
        }
    }
}

我还没有详细研究 David Fowler 的框架,但我希望他使用与我相同的完成语义:当输出(套接字写入)管道完成时,然后关闭(网络)套接字.

在您的代码中,ProcessSendsAsync 将在管道完成时 return,这将导致 DoSendAsync 调用 webSocket.Abort。我假设您应该修改 DoSendAsync 以在管道无异常完成时调用 CloseAsync,并且仅在出现异常时调用 Abort

如果您还没有看过,this 是我讨论如何处理关机的视频,既异常又优雅。