异步NetworkStream 连读设计模式
Asynchronous NetworkStream continuous reading design pattern
我正在尝试将使用多个线程的 class 转换为使用重叠 I/O。它几乎可以正常工作,但它似乎随机遇到线程问题,我不确定为什么。
直接 post 的代码太多,但这是基本模式。目标是坐在那里从连接读取数据直到连接被处理,所以当每个 EndRead()
完成时,它应该开始一个新的 BeginRead()
.
public enum State
{
Idle,
BeforeRead,
PendingRead,
FinishingRead,
Died,
}
private int state;
private IAsyncResult asyncResult;
private byte[] readBuffer = new byte[4096];
private System.Net.Sockets.NetworkStream stream;
public void Connect(System.Net.Sockets.TcpClient client, string host, int port)
{
client.Connect(host, port);
this.stream = client.GetStream();
}
private bool SetState(State expectedState, State newState)
{
return Interlocked.CompareExchange(ref this.state, (int)newState, (int)expectedState) == expectedState;
}
public void BeginRead()
{
try
{
while (true)
{
if (!SetState(State.Idle, State.BeforeRead))
return;
IAsyncResult async;
async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);
if (async == null)
return;
SetState(State.BeforeRead, State.PendingRead);
lock (this)
this.asyncResult = async;
if (async.AsyncWaitHandle.WaitOne(0))
EndRead(false);
}
}
catch { this.state = State.Died; }
}
private void EndRead(bool asynchronousCallback)
{
try
{
if (!SetState(State.PendingRead, State.FinishingRead))
return;
IAsyncResult async;
lock (this)
{
async = this.asyncResult;
this.asyncResult = null;
}
if (async == null)
return;
int bytesRead = stream.EndRead(async);
HandleData(bytesRead, readBuffer);
SetState(State.FinishingRead, State.Idle);
if (asynchronousCallback)
BeginRead();
}
catch { this.state = State.Died; }
}
大部分时间它都有效,但偶尔它会做以下事情之一:
- 停止接收消息
- 抛出一个异常,我
asyncResult has already been handled: "EndReceive can only be called once for each asynchronous operation"
.
我还应该提到另一个线程正在进行同步写入(stream.Write,而不是 stream.BeginWrite
)。我认为读和写应该是相互独立的,所以不应该影响行为。
我的设计是否存在根本性缺陷?这是一个精简的示例,所以我删除的内容可能会导致问题,但我需要知道我的基本设计是否有效。异步链式读取的正确方法是什么?
(如果建议使用 async/await
,此代码需要在 Windows XP 上 运行,因此这不是一个选项。)
您有竞争条件:
IAsyncResult async;
async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);
/* Race condition here */
if (async == null)
return;
SetState(State.BeforeRead, State.PendingRead);
lock (this)
this.asyncResult = async;
您的 EndRead
可以在 SetState
and/or 之前执行 this.asyncResult = async
之前执行。你不能做这个。必须在 发出 BeginRead
之前设置状态 ,并在失败时重置。不要保留和使用成员 asyncResult
,而是将回调传递给 BeginRead
并在回调上获取异步结果:
SetState(State.BeforeRead, State.PendingRead);
stream.BeginRead(readBuffer, 0, readBuffer.Length, EndRead);
...
private void EndRead(IAsyncResult asyncResult) {
int bytesRead = stream.EndRead(asyncResult);
...
}
我正在尝试将使用多个线程的 class 转换为使用重叠 I/O。它几乎可以正常工作,但它似乎随机遇到线程问题,我不确定为什么。
直接 post 的代码太多,但这是基本模式。目标是坐在那里从连接读取数据直到连接被处理,所以当每个 EndRead()
完成时,它应该开始一个新的 BeginRead()
.
public enum State
{
Idle,
BeforeRead,
PendingRead,
FinishingRead,
Died,
}
private int state;
private IAsyncResult asyncResult;
private byte[] readBuffer = new byte[4096];
private System.Net.Sockets.NetworkStream stream;
public void Connect(System.Net.Sockets.TcpClient client, string host, int port)
{
client.Connect(host, port);
this.stream = client.GetStream();
}
private bool SetState(State expectedState, State newState)
{
return Interlocked.CompareExchange(ref this.state, (int)newState, (int)expectedState) == expectedState;
}
public void BeginRead()
{
try
{
while (true)
{
if (!SetState(State.Idle, State.BeforeRead))
return;
IAsyncResult async;
async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);
if (async == null)
return;
SetState(State.BeforeRead, State.PendingRead);
lock (this)
this.asyncResult = async;
if (async.AsyncWaitHandle.WaitOne(0))
EndRead(false);
}
}
catch { this.state = State.Died; }
}
private void EndRead(bool asynchronousCallback)
{
try
{
if (!SetState(State.PendingRead, State.FinishingRead))
return;
IAsyncResult async;
lock (this)
{
async = this.asyncResult;
this.asyncResult = null;
}
if (async == null)
return;
int bytesRead = stream.EndRead(async);
HandleData(bytesRead, readBuffer);
SetState(State.FinishingRead, State.Idle);
if (asynchronousCallback)
BeginRead();
}
catch { this.state = State.Died; }
}
大部分时间它都有效,但偶尔它会做以下事情之一:
- 停止接收消息
- 抛出一个异常,我
asyncResult has already been handled: "EndReceive can only be called once for each asynchronous operation"
.
我还应该提到另一个线程正在进行同步写入(stream.Write,而不是 stream.BeginWrite
)。我认为读和写应该是相互独立的,所以不应该影响行为。
我的设计是否存在根本性缺陷?这是一个精简的示例,所以我删除的内容可能会导致问题,但我需要知道我的基本设计是否有效。异步链式读取的正确方法是什么?
(如果建议使用 async/await
,此代码需要在 Windows XP 上 运行,因此这不是一个选项。)
您有竞争条件:
IAsyncResult async;
async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);
/* Race condition here */
if (async == null)
return;
SetState(State.BeforeRead, State.PendingRead);
lock (this)
this.asyncResult = async;
您的 EndRead
可以在 SetState
and/or 之前执行 this.asyncResult = async
之前执行。你不能做这个。必须在 发出 BeginRead
之前设置状态 ,并在失败时重置。不要保留和使用成员 asyncResult
,而是将回调传递给 BeginRead
并在回调上获取异步结果:
SetState(State.BeforeRead, State.PendingRead);
stream.BeginRead(readBuffer, 0, readBuffer.Length, EndRead);
...
private void EndRead(IAsyncResult asyncResult) {
int bytesRead = stream.EndRead(asyncResult);
...
}