异步NetworkStream 连读设计模式

Asynchronous NetworkStream continuous reading design pattern

我正在尝试将使用多个线程的 class 转换为使用重叠 I/O。它几乎可以正常工作,但它似乎随机遇到线程问题,我不确定为什么。

直接 post 的代码太多,但这是基本模式。目标是坐在那里从连接读取数据直到连接被处理,所以当每个 EndRead() 完成时,它应该开始一个新的 BeginRead().

public enum State
{
    Idle,
    BeforeRead,
    PendingRead,
    FinishingRead,
    Died,
}

private int state;
private IAsyncResult asyncResult;
private byte[] readBuffer = new byte[4096];
private System.Net.Sockets.NetworkStream stream;
public void Connect(System.Net.Sockets.TcpClient client, string host, int port)
{
    client.Connect(host, port);
    this.stream = client.GetStream();
}

private bool SetState(State expectedState, State newState)
{
    return Interlocked.CompareExchange(ref this.state, (int)newState, (int)expectedState) == expectedState;
}
public void BeginRead()
{
    try
    {
        while (true)
        {
            if (!SetState(State.Idle, State.BeforeRead))
                return;
            IAsyncResult async;
            async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);
            if (async == null)
                return;
            SetState(State.BeforeRead, State.PendingRead);
            lock (this)
                this.asyncResult = async;
            if (async.AsyncWaitHandle.WaitOne(0))
                EndRead(false);
        }
    }
    catch { this.state = State.Died; }
}
private void EndRead(bool asynchronousCallback)
{
    try
    {
        if (!SetState(State.PendingRead, State.FinishingRead))
            return;
        IAsyncResult async;
        lock (this)
        {
            async = this.asyncResult;
            this.asyncResult = null;
        }
        if (async == null)
            return;
        int bytesRead = stream.EndRead(async);
        HandleData(bytesRead, readBuffer);
        SetState(State.FinishingRead, State.Idle);
        if (asynchronousCallback)
            BeginRead();
    }
    catch { this.state = State.Died; }
}

大部分时间它都有效,但偶尔它会做以下事情之一:

我还应该提到另一个线程正在进行同步写入(stream.Write,而不是 stream.BeginWrite)。我认为读和写应该是相互独立的,所以不应该影响行为。

我的设计是否存在根本性缺陷?这是一个精简的示例,所以我删除的内容可能会导致问题,但我需要知道我的基本设计是否有效。异步链式读取的正确方法是什么?

(如果建议使用 async/await,此代码需要在 Windows XP 上 运行,因此这不是一个选项。)

您有竞争条件:

        IAsyncResult async;
        async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);

        /* Race condition here */   

        if (async == null)
            return;
        SetState(State.BeforeRead, State.PendingRead);
        lock (this)
            this.asyncResult = async;

您的 EndRead 可以在 SetState and/or 之前执行 this.asyncResult = async 之前执行。你不能做这个。必须在 发出 BeginRead 之前设置状态 ,并在失败时重置。不要保留和使用成员 asyncResult,而是将回调传递给 BeginRead 并在回调上获取异步结果:

  SetState(State.BeforeRead, State.PendingRead);
  stream.BeginRead(readBuffer, 0, readBuffer.Length, EndRead);

 ...

  private void EndRead(IAsyncResult asyncResult) {
     int bytesRead = stream.EndRead(asyncResult);
     ...
  }