C#重连NetworkStream时如何协调读写线程?

C# How to coordinate read and write threads while reconnecting NetworkStream?

我有一个线程向 NetworkStream 写入请求。另一个线程正在读取来自该流的回复。

我想让它容错。如果网络出现故障,我希望 NetworkStream 被处理掉,换成一个全新的。

我让两个线程处理 IO/Socket 异常。他们每个人都会尝试重新建立连接。我正在努力协调这两个线程。我不得不放置锁定部分,使代码相当复杂且容易出错。

是否有推荐的实现方法?也许使用单线程但使读取或写入异步?

我发现最简单的方法是让一个线程处理协调和写入,然后使用异步处理读取。通过在 AsyncState 对象中包含 CancellationTokenSource,reader 代码可以在接收方遇到错误(调用 EndRead 或者如果流完成)。 coordination/writer 位于创建连接的循环中,然后循环使用要发送的 BlockingCollection<T> 项目。通过使用 BlockingCollection.GetConsumingEnumerable(token) 可以在 reader 遇到错误时取消发送。

    private class AsyncState
    {
        public byte[] Buffer { get; set; }
        public NetworkStream NetworkStream { get; set; }
        public CancellationTokenSource CancellationTokenSource { get; set; }
    }

创建连接后,您可以启动异步读取过程(只要一切正常,它就会不断调用自身)。在状态对象中传递缓冲区、流和 CancellationTokenSource

var buffer = new byte[1];
stream.BeginRead(buffer, 0, 1, Callback,
   new AsyncState
   {
       Buffer = buffer,
       NetworkStream = stream,
       CancellationTokenSource = cts2
   });

之后您开始从输出队列中读取并写入流,直到取消或发生故障:

using (var writer = new StreamWriter(stream, Encoding.ASCII, 80, true))
{
    foreach (var item in this.sendQueue.GetConsumingEnumerable(cancellationToken))
    {
       ...

...并且在回调中,您可以检查是否存在故障,并在必要时点击 CancellationTokenSource 以向编写器线程发出重新启动连接的信号。

private void Callback(IAsyncResult ar)
{
  var state = (AsyncState)ar.AsyncState;

  if (ar.IsCompleted)
  {
    try
    {
        int bytesRead = state.NetworkStream.EndRead(ar);
        LogState("Post read ", state.NetworkStream);
    }
    catch (Exception ex)
    {
        Log.Warn("Exception during EndRead", ex);
        state.CancellationTokenSource.Cancel();
        return;
    }

    // Deal with the character received
    char c = (char)state.Buffer[0];

    if (c < 0)
    {
        Log.Warn("c < 0, stream closing");
        state.CancellationTokenSource.Cancel();
        return;
    }

    ... deal with the character here, building up a buffer and
    ... handing it out to the application when completed
    ... perhaps using Rx Subject<T> to make it easy to subscribe

    ... and finally ask for the next byte with the same Callback

    // Launch the next reader
    var buffer2 = new byte[1];
    var state2 = state.WithNewBuffer(buffer2);
    state.NetworkStream.BeginRead(buffer2, 0, 1, Callback, state2);