优雅地关闭管网套接字
Gracefully closing pipe web sockets
我正在尝试创建一个管道网络套接字客户端,它将从交换中提取数据。该代码介于 David Fowler 的 BedrockFramework and @StephenCleary's TCP Chat 之间。我不确定 IDuplexPipe
(David 的方式)和普通管道(Stephen 的方式)之间是否有区别,我相信它们是表达同一事物的两种方式。
问题是我如何优雅地 stop/close 一切,即基本上我想创建一个 StopAsync
方法?附带问题,他们不使用 CancellationToken
的原因是什么?
public sealed class PipeSocketClient
{
private readonly string _baseUrl;
private readonly Pipe _inputPipe;
private readonly Pipe _outputPipe;
private readonly ClientWebSocket _webSocket;
private volatile bool _aborted;
public PipeSocketClient(string baseUrl, PipeOptions? pipeOptions = default)
{
_baseUrl = baseUrl;
_webSocket = new ClientWebSocket();
_inputPipe = new Pipe(pipeOptions ?? PipeOptions.Default);
_outputPipe = new Pipe();
}
public async ValueTask<PipeSocketClient> StartAsync()
{
await _webSocket.ConnectAsync(new Uri(_baseUrl), CancellationToken.None).ConfigureAwait(false);
_ = ExecuteAsync(_webSocket, _inputPipe.Writer, _outputPipe.Reader);
return this;
}
private async Task ExecuteAsync(WebSocket webSocket, PipeWriter pipeWriter, PipeReader pipeReader)
{
Exception? sendError = null;
try
{
// Spawn send and receive logic
var receiveTask = DoReceiveAsync(webSocket, pipeWriter);
var sendTask = DoSendAsync(webSocket, pipeReader);
// If the sending task completes then close the receive
// We don't need to do this in the other direction because the kestrel
// will trigger the output closing once the input is complete.
if (await Task.WhenAny(receiveTask, sendTask).ConfigureAwait(false) == sendTask)
{
// Tell the reader it's being aborted
webSocket.Dispose();
}
// Now wait for both to complete
await receiveTask.ConfigureAwait(false);
sendError = await sendTask.ConfigureAwait(false);
// Dispose the socket(should noop if already called)
webSocket.Dispose();
}
catch (Exception ex)
{
Console.WriteLine($"Unexpected exception: {ex.Message}");
}
finally
{
// Complete the output after disposing the socket
await pipeReader.CompleteAsync(sendError).ConfigureAwait(false);
}
}
private async Task DoReceiveAsync(WebSocket webSocket, PipeWriter pipeWriter)
{
Exception? error = null;
try
{
await ProcessReceivesAsync(webSocket, pipeWriter).ConfigureAwait(false);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
{
error = new ConnectionResetException(ex.Message, ex);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted ||
ex.SocketErrorCode == SocketError.ConnectionAborted ||
ex.SocketErrorCode == SocketError.Interrupted ||
ex.SocketErrorCode == SocketError.InvalidArgument)
{
if (!_aborted)
{
// Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
error = new ConnectionAbortedException();
}
}
catch (ObjectDisposedException)
{
if (!_aborted)
{
error = new ConnectionAbortedException();
}
}
catch (IOException ex)
{
error = ex;
}
catch (Exception ex)
{
error = new IOException(ex.Message, ex);
}
finally
{
if (_aborted)
{
error ??= new ConnectionAbortedException();
}
await pipeWriter.CompleteAsync(error).ConfigureAwait(false);
}
}
private async Task ProcessReceivesAsync(WebSocket webSocket, PipeWriter pipeWriter)
{
while (true)
{
// Ensure we have some reasonable amount of buffer space
var buffer = pipeWriter.GetMemory();
var bytesRead = await webSocket.ReceiveAsync(buffer, CancellationToken.None).ConfigureAwait(false);
if (bytesRead.Count == 0)
{
break;
}
pipeWriter.Advance(bytesRead.Count);
var flushTask = pipeWriter.FlushAsync();
if (!flushTask.IsCompleted)
{
await flushTask.ConfigureAwait(false);
}
var result = flushTask.Result;
if (result.IsCompleted)
{
// Pipe consumer is shut down, do we stop writing
break;
}
}
}
private async Task<Exception?> DoSendAsync(WebSocket webSocket, PipeReader pipeReader)
{
Exception? error = null;
try
{
await ProcessSendsAsync(webSocket, pipeReader).ConfigureAwait(false);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
{
error = null;
}
catch (ObjectDisposedException)
{
error = null;
}
catch (IOException ex)
{
error = ex;
}
catch (Exception ex)
{
error = new IOException(ex.Message, ex);
}
finally
{
_aborted = true;
webSocket.Abort();
}
return error;
}
private async Task ProcessSendsAsync(WebSocket webSocket, PipeReader pipeReader)
{
while (true)
{
// Wait for data to write from the pipe producer
var result = await pipeReader.ReadAsync().ConfigureAwait(false);
var buffer = result.Buffer;
if (result.IsCanceled)
{
break;
}
var end = buffer.End;
var isCompleted = result.IsCompleted;
if (!buffer.IsEmpty)
{
await webSocket.SendAsync(buffer, WebSocketMessageType.Text).ConfigureAwait(false);
}
pipeReader.AdvanceTo(end);
if (isCompleted)
{
break;
}
}
}
}
我还没有详细研究 David Fowler 的框架,但我希望他使用与我相同的完成语义:当输出(套接字写入)管道完成时,然后关闭(网络)套接字.
在您的代码中,ProcessSendsAsync
将在管道完成时 return,这将导致 DoSendAsync
调用 webSocket.Abort
。我假设您应该修改 DoSendAsync
以在管道无异常完成时调用 CloseAsync
,并且仅在出现异常时调用 Abort
。
如果您还没有看过,this 是我讨论如何处理关机的视频,既异常又优雅。
我正在尝试创建一个管道网络套接字客户端,它将从交换中提取数据。该代码介于 David Fowler 的 BedrockFramework and @StephenCleary's TCP Chat 之间。我不确定 IDuplexPipe
(David 的方式)和普通管道(Stephen 的方式)之间是否有区别,我相信它们是表达同一事物的两种方式。
问题是我如何优雅地 stop/close 一切,即基本上我想创建一个 StopAsync
方法?附带问题,他们不使用 CancellationToken
的原因是什么?
public sealed class PipeSocketClient
{
private readonly string _baseUrl;
private readonly Pipe _inputPipe;
private readonly Pipe _outputPipe;
private readonly ClientWebSocket _webSocket;
private volatile bool _aborted;
public PipeSocketClient(string baseUrl, PipeOptions? pipeOptions = default)
{
_baseUrl = baseUrl;
_webSocket = new ClientWebSocket();
_inputPipe = new Pipe(pipeOptions ?? PipeOptions.Default);
_outputPipe = new Pipe();
}
public async ValueTask<PipeSocketClient> StartAsync()
{
await _webSocket.ConnectAsync(new Uri(_baseUrl), CancellationToken.None).ConfigureAwait(false);
_ = ExecuteAsync(_webSocket, _inputPipe.Writer, _outputPipe.Reader);
return this;
}
private async Task ExecuteAsync(WebSocket webSocket, PipeWriter pipeWriter, PipeReader pipeReader)
{
Exception? sendError = null;
try
{
// Spawn send and receive logic
var receiveTask = DoReceiveAsync(webSocket, pipeWriter);
var sendTask = DoSendAsync(webSocket, pipeReader);
// If the sending task completes then close the receive
// We don't need to do this in the other direction because the kestrel
// will trigger the output closing once the input is complete.
if (await Task.WhenAny(receiveTask, sendTask).ConfigureAwait(false) == sendTask)
{
// Tell the reader it's being aborted
webSocket.Dispose();
}
// Now wait for both to complete
await receiveTask.ConfigureAwait(false);
sendError = await sendTask.ConfigureAwait(false);
// Dispose the socket(should noop if already called)
webSocket.Dispose();
}
catch (Exception ex)
{
Console.WriteLine($"Unexpected exception: {ex.Message}");
}
finally
{
// Complete the output after disposing the socket
await pipeReader.CompleteAsync(sendError).ConfigureAwait(false);
}
}
private async Task DoReceiveAsync(WebSocket webSocket, PipeWriter pipeWriter)
{
Exception? error = null;
try
{
await ProcessReceivesAsync(webSocket, pipeWriter).ConfigureAwait(false);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
{
error = new ConnectionResetException(ex.Message, ex);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted ||
ex.SocketErrorCode == SocketError.ConnectionAborted ||
ex.SocketErrorCode == SocketError.Interrupted ||
ex.SocketErrorCode == SocketError.InvalidArgument)
{
if (!_aborted)
{
// Calling Dispose after ReceiveAsync can cause an "InvalidArgument" error on *nix.
error = new ConnectionAbortedException();
}
}
catch (ObjectDisposedException)
{
if (!_aborted)
{
error = new ConnectionAbortedException();
}
}
catch (IOException ex)
{
error = ex;
}
catch (Exception ex)
{
error = new IOException(ex.Message, ex);
}
finally
{
if (_aborted)
{
error ??= new ConnectionAbortedException();
}
await pipeWriter.CompleteAsync(error).ConfigureAwait(false);
}
}
private async Task ProcessReceivesAsync(WebSocket webSocket, PipeWriter pipeWriter)
{
while (true)
{
// Ensure we have some reasonable amount of buffer space
var buffer = pipeWriter.GetMemory();
var bytesRead = await webSocket.ReceiveAsync(buffer, CancellationToken.None).ConfigureAwait(false);
if (bytesRead.Count == 0)
{
break;
}
pipeWriter.Advance(bytesRead.Count);
var flushTask = pipeWriter.FlushAsync();
if (!flushTask.IsCompleted)
{
await flushTask.ConfigureAwait(false);
}
var result = flushTask.Result;
if (result.IsCompleted)
{
// Pipe consumer is shut down, do we stop writing
break;
}
}
}
private async Task<Exception?> DoSendAsync(WebSocket webSocket, PipeReader pipeReader)
{
Exception? error = null;
try
{
await ProcessSendsAsync(webSocket, pipeReader).ConfigureAwait(false);
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
{
error = null;
}
catch (ObjectDisposedException)
{
error = null;
}
catch (IOException ex)
{
error = ex;
}
catch (Exception ex)
{
error = new IOException(ex.Message, ex);
}
finally
{
_aborted = true;
webSocket.Abort();
}
return error;
}
private async Task ProcessSendsAsync(WebSocket webSocket, PipeReader pipeReader)
{
while (true)
{
// Wait for data to write from the pipe producer
var result = await pipeReader.ReadAsync().ConfigureAwait(false);
var buffer = result.Buffer;
if (result.IsCanceled)
{
break;
}
var end = buffer.End;
var isCompleted = result.IsCompleted;
if (!buffer.IsEmpty)
{
await webSocket.SendAsync(buffer, WebSocketMessageType.Text).ConfigureAwait(false);
}
pipeReader.AdvanceTo(end);
if (isCompleted)
{
break;
}
}
}
}
我还没有详细研究 David Fowler 的框架,但我希望他使用与我相同的完成语义:当输出(套接字写入)管道完成时,然后关闭(网络)套接字.
在您的代码中,ProcessSendsAsync
将在管道完成时 return,这将导致 DoSendAsync
调用 webSocket.Abort
。我假设您应该修改 DoSendAsync
以在管道无异常完成时调用 CloseAsync
,并且仅在出现异常时调用 Abort
。
如果您还没有看过,this 是我讨论如何处理关机的视频,既异常又优雅。