C#重连NetworkStream时如何协调读写线程?
C# How to coordinate read and write threads while reconnecting NetworkStream?
我有一个线程向 NetworkStream 写入请求。另一个线程正在读取来自该流的回复。
我想让它容错。如果网络出现故障,我希望 NetworkStream 被处理掉,换成一个全新的。
我让两个线程处理 IO/Socket 异常。他们每个人都会尝试重新建立连接。我正在努力协调这两个线程。我不得不放置锁定部分,使代码相当复杂且容易出错。
是否有推荐的实现方法?也许使用单线程但使读取或写入异步?
我发现最简单的方法是让一个线程处理协调和写入,然后使用异步处理读取。通过在 AsyncState
对象中包含 CancellationTokenSource
,reader 代码可以在接收方遇到错误(调用 EndRead
或者如果流完成)。 coordination/writer 位于创建连接的循环中,然后循环使用要发送的 BlockingCollection<T>
项目。通过使用 BlockingCollection.GetConsumingEnumerable(token)
可以在 reader 遇到错误时取消发送。
private class AsyncState
{
public byte[] Buffer { get; set; }
public NetworkStream NetworkStream { get; set; }
public CancellationTokenSource CancellationTokenSource { get; set; }
}
创建连接后,您可以启动异步读取过程(只要一切正常,它就会不断调用自身)。在状态对象中传递缓冲区、流和 CancellationTokenSource
:
var buffer = new byte[1];
stream.BeginRead(buffer, 0, 1, Callback,
new AsyncState
{
Buffer = buffer,
NetworkStream = stream,
CancellationTokenSource = cts2
});
之后您开始从输出队列中读取并写入流,直到取消或发生故障:
using (var writer = new StreamWriter(stream, Encoding.ASCII, 80, true))
{
foreach (var item in this.sendQueue.GetConsumingEnumerable(cancellationToken))
{
...
...并且在回调中,您可以检查是否存在故障,并在必要时点击 CancellationTokenSource 以向编写器线程发出重新启动连接的信号。
private void Callback(IAsyncResult ar)
{
var state = (AsyncState)ar.AsyncState;
if (ar.IsCompleted)
{
try
{
int bytesRead = state.NetworkStream.EndRead(ar);
LogState("Post read ", state.NetworkStream);
}
catch (Exception ex)
{
Log.Warn("Exception during EndRead", ex);
state.CancellationTokenSource.Cancel();
return;
}
// Deal with the character received
char c = (char)state.Buffer[0];
if (c < 0)
{
Log.Warn("c < 0, stream closing");
state.CancellationTokenSource.Cancel();
return;
}
... deal with the character here, building up a buffer and
... handing it out to the application when completed
... perhaps using Rx Subject<T> to make it easy to subscribe
... and finally ask for the next byte with the same Callback
// Launch the next reader
var buffer2 = new byte[1];
var state2 = state.WithNewBuffer(buffer2);
state.NetworkStream.BeginRead(buffer2, 0, 1, Callback, state2);
我有一个线程向 NetworkStream 写入请求。另一个线程正在读取来自该流的回复。
我想让它容错。如果网络出现故障,我希望 NetworkStream 被处理掉,换成一个全新的。
我让两个线程处理 IO/Socket 异常。他们每个人都会尝试重新建立连接。我正在努力协调这两个线程。我不得不放置锁定部分,使代码相当复杂且容易出错。
是否有推荐的实现方法?也许使用单线程但使读取或写入异步?
我发现最简单的方法是让一个线程处理协调和写入,然后使用异步处理读取。通过在 AsyncState
对象中包含 CancellationTokenSource
,reader 代码可以在接收方遇到错误(调用 EndRead
或者如果流完成)。 coordination/writer 位于创建连接的循环中,然后循环使用要发送的 BlockingCollection<T>
项目。通过使用 BlockingCollection.GetConsumingEnumerable(token)
可以在 reader 遇到错误时取消发送。
private class AsyncState
{
public byte[] Buffer { get; set; }
public NetworkStream NetworkStream { get; set; }
public CancellationTokenSource CancellationTokenSource { get; set; }
}
创建连接后,您可以启动异步读取过程(只要一切正常,它就会不断调用自身)。在状态对象中传递缓冲区、流和 CancellationTokenSource
:
var buffer = new byte[1];
stream.BeginRead(buffer, 0, 1, Callback,
new AsyncState
{
Buffer = buffer,
NetworkStream = stream,
CancellationTokenSource = cts2
});
之后您开始从输出队列中读取并写入流,直到取消或发生故障:
using (var writer = new StreamWriter(stream, Encoding.ASCII, 80, true))
{
foreach (var item in this.sendQueue.GetConsumingEnumerable(cancellationToken))
{
...
...并且在回调中,您可以检查是否存在故障,并在必要时点击 CancellationTokenSource 以向编写器线程发出重新启动连接的信号。
private void Callback(IAsyncResult ar)
{
var state = (AsyncState)ar.AsyncState;
if (ar.IsCompleted)
{
try
{
int bytesRead = state.NetworkStream.EndRead(ar);
LogState("Post read ", state.NetworkStream);
}
catch (Exception ex)
{
Log.Warn("Exception during EndRead", ex);
state.CancellationTokenSource.Cancel();
return;
}
// Deal with the character received
char c = (char)state.Buffer[0];
if (c < 0)
{
Log.Warn("c < 0, stream closing");
state.CancellationTokenSource.Cancel();
return;
}
... deal with the character here, building up a buffer and
... handing it out to the application when completed
... perhaps using Rx Subject<T> to make it easy to subscribe
... and finally ask for the next byte with the same Callback
// Launch the next reader
var buffer2 = new byte[1];
var state2 = state.WithNewBuffer(buffer2);
state.NetworkStream.BeginRead(buffer2, 0, 1, Callback, state2);