C# TcpClient 通过持久连接读取多条消息
C# TcpClient reading multiple messages over persistent connection
我正在尝试创建一个具有持久连接的 TCP 服务器和客户端,以便服务器和客户端在任何时间点都可以相互通知某些 'events'(所以推送而不是轮询).
我几乎一切正常,客户端可以连接,连接保持打开状态,客户端和服务器都可以写入和读取 tcp 流。
问题出在读取上,我通过首先发送包含消息长度的 8 个字节来定义消息边界。
一旦我收到它,就会读取长度为 x 的消息并引发一个事件。
一切正常,但是一旦消息被读取,我希望 "await stream.ReadAsync" 等待新的传入数据,但它一直循环(和 returns 0 数据)而不是等待导致 100 % cpu 使用率。
有没有办法对流说 'Reset' 以便它像原来那样再次开始等待。
这是我的tcpclient的代码(用于发送和接收),你可以跳到RunListener方法,我认为其他的都不重要。
public class SSLTcpClient : IDisposable {
/**
* Public fields
*/
public SslStream SslStream { get; private set; }
/**
* Events
*/
public ConnectionHandler connected;
public ConnectionHandler disconnected;
public DataTransfer dataReceived;
/**
* Constructors
*/
public SSLTcpClient() { }
public SSLTcpClient(TcpClient pClient, X509Certificate2 pCert) {
SslStream = new SslStream(
pClient.GetStream(),
false,
new RemoteCertificateValidationCallback(
delegate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
return true;
}
),
new LocalCertificateSelectionCallback(
delegate(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers) {
return new X509Certificate2(pCert);
}
)
);
try {
SslStream.AuthenticateAsServer(pCert, true, SslProtocols.Tls, true);
} catch (AuthenticationException) {
pClient.Close();
return;
}
Thread objThread = new Thread(new ThreadStart(RunListener));
objThread.Start();
if (connected != null) {
connected(this);
}
}
/**
* Connect the TcpClient
*/
public bool ConnectAsync(IPAddress pIP, int pPort, string pX509CertificatePath, string pX509CertificatePassword) {
TcpClient objClient = new TcpClient();
try {
if(!objClient.ConnectAsync(pIP, pPort).Wait(1000)) {
throw new Exception("Connect failed");
};
} catch (Exception) {
return false;
}
X509Certificate2 clientCertificate;
X509Certificate2Collection clientCertificatecollection = new X509Certificate2Collection();
try {
clientCertificate = new X509Certificate2(pX509CertificatePath, pX509CertificatePassword);
clientCertificatecollection.Add(clientCertificate);
} catch(CryptographicException) {
objClient.Close();
return false;
}
SslStream = new SslStream(
objClient.GetStream(),
false,
new RemoteCertificateValidationCallback(
delegate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
return true;
}
),
new LocalCertificateSelectionCallback(
delegate(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers) {
var cert = new X509Certificate2(pX509CertificatePath, pX509CertificatePassword);
return cert;
}
)
);
try {
SslStream.AuthenticateAsClient(pIP.ToString(), clientCertificatecollection, SslProtocols.Tls, false);
} catch (AuthenticationException) {
objClient.Close();
return false;
}
Thread objThread = new Thread(new ThreadStart(RunListener));
objThread.Start();
if (connected != null) {
connected(this);
}
return true;
}
/**
* Reading
*/
private async void RunListener() {
try {
while (true) {
byte[] bytes = new byte[8];
await SslStream.ReadAsync(bytes, 0, (int)bytes.Length);
int bufLenght = BitConverter.ToInt32(bytes, 0);
if (bufLenght > 0) {
byte[] buffer = new byte[bufLenght];
await SslStream.ReadAsync(buffer, 0, bufLenght);
if (dataReceived != null) {
dataReceived(this, buffer);
}
}
}
} catch (Exception) {
Dispose();
}
}
/**
* Writing
*/
public bool Send(byte[] pData) {
try {
byte[] lenght = BitConverter.GetBytes(pData.Length);
Array.Resize(ref lenght, 8);
SslStream.Write(lenght);
if (!SslStream.WriteAsync(pData, 0, pData.Length).Wait(1000)) {
throw new Exception("Send timed out");
}
} catch (Exception) {
Dispose();
return false;
}
return true;
}
public bool Send(string pData) {
byte[] bytes = System.Text.Encoding.UTF8.GetBytes(pData);
return Send(bytes);
}
/**
* Shutdown
*/
public void Dispose() {
SslStream.Close();
if (disconnected != null) {
disconnected(this);
}
}
}
只有 2 个想法有望帮助改进您的代码,但无法回答您最初的问题:
- 您要发送 8 个字节指示以下有效负载长度,但在以下
BitConverter.ToInt32
调用中仅使用其中的 4 个字节,因此 4 个字节就足够了。
- 如果从另一端切断传输会怎样?在我看来,您无法确定您收到的数据是否有效。也许构建类似小型低级协议的东西会有所帮助,例如
4 bytes raw data length
,然后是 raw data
本身,然后是 some bytes of checksum
(这将允许验证您收到的数据是否已正确传输)。
你读取4或8个字节的方式是错误的。你需要循环直到你真正得到它们。你可能会得到 1.
您在这里和其他地方假设您将阅读所需的数量。您将至少读取一个字节,如果连接被远程端关闭,则读取零字节。
或许,您应该使用 BinaryReader
来抽象掉循环。
此外,您需要清理资源。为什么不将它们包装在 using
中?所有 Close
调用都不安全且不需要。
我也不明白为什么这里需要控制流异常。重构它。
我正在尝试创建一个具有持久连接的 TCP 服务器和客户端,以便服务器和客户端在任何时间点都可以相互通知某些 'events'(所以推送而不是轮询).
我几乎一切正常,客户端可以连接,连接保持打开状态,客户端和服务器都可以写入和读取 tcp 流。
问题出在读取上,我通过首先发送包含消息长度的 8 个字节来定义消息边界。 一旦我收到它,就会读取长度为 x 的消息并引发一个事件。
一切正常,但是一旦消息被读取,我希望 "await stream.ReadAsync" 等待新的传入数据,但它一直循环(和 returns 0 数据)而不是等待导致 100 % cpu 使用率。
有没有办法对流说 'Reset' 以便它像原来那样再次开始等待。
这是我的tcpclient的代码(用于发送和接收),你可以跳到RunListener方法,我认为其他的都不重要。
public class SSLTcpClient : IDisposable {
/**
* Public fields
*/
public SslStream SslStream { get; private set; }
/**
* Events
*/
public ConnectionHandler connected;
public ConnectionHandler disconnected;
public DataTransfer dataReceived;
/**
* Constructors
*/
public SSLTcpClient() { }
public SSLTcpClient(TcpClient pClient, X509Certificate2 pCert) {
SslStream = new SslStream(
pClient.GetStream(),
false,
new RemoteCertificateValidationCallback(
delegate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
return true;
}
),
new LocalCertificateSelectionCallback(
delegate(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers) {
return new X509Certificate2(pCert);
}
)
);
try {
SslStream.AuthenticateAsServer(pCert, true, SslProtocols.Tls, true);
} catch (AuthenticationException) {
pClient.Close();
return;
}
Thread objThread = new Thread(new ThreadStart(RunListener));
objThread.Start();
if (connected != null) {
connected(this);
}
}
/**
* Connect the TcpClient
*/
public bool ConnectAsync(IPAddress pIP, int pPort, string pX509CertificatePath, string pX509CertificatePassword) {
TcpClient objClient = new TcpClient();
try {
if(!objClient.ConnectAsync(pIP, pPort).Wait(1000)) {
throw new Exception("Connect failed");
};
} catch (Exception) {
return false;
}
X509Certificate2 clientCertificate;
X509Certificate2Collection clientCertificatecollection = new X509Certificate2Collection();
try {
clientCertificate = new X509Certificate2(pX509CertificatePath, pX509CertificatePassword);
clientCertificatecollection.Add(clientCertificate);
} catch(CryptographicException) {
objClient.Close();
return false;
}
SslStream = new SslStream(
objClient.GetStream(),
false,
new RemoteCertificateValidationCallback(
delegate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
return true;
}
),
new LocalCertificateSelectionCallback(
delegate(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers) {
var cert = new X509Certificate2(pX509CertificatePath, pX509CertificatePassword);
return cert;
}
)
);
try {
SslStream.AuthenticateAsClient(pIP.ToString(), clientCertificatecollection, SslProtocols.Tls, false);
} catch (AuthenticationException) {
objClient.Close();
return false;
}
Thread objThread = new Thread(new ThreadStart(RunListener));
objThread.Start();
if (connected != null) {
connected(this);
}
return true;
}
/**
* Reading
*/
private async void RunListener() {
try {
while (true) {
byte[] bytes = new byte[8];
await SslStream.ReadAsync(bytes, 0, (int)bytes.Length);
int bufLenght = BitConverter.ToInt32(bytes, 0);
if (bufLenght > 0) {
byte[] buffer = new byte[bufLenght];
await SslStream.ReadAsync(buffer, 0, bufLenght);
if (dataReceived != null) {
dataReceived(this, buffer);
}
}
}
} catch (Exception) {
Dispose();
}
}
/**
* Writing
*/
public bool Send(byte[] pData) {
try {
byte[] lenght = BitConverter.GetBytes(pData.Length);
Array.Resize(ref lenght, 8);
SslStream.Write(lenght);
if (!SslStream.WriteAsync(pData, 0, pData.Length).Wait(1000)) {
throw new Exception("Send timed out");
}
} catch (Exception) {
Dispose();
return false;
}
return true;
}
public bool Send(string pData) {
byte[] bytes = System.Text.Encoding.UTF8.GetBytes(pData);
return Send(bytes);
}
/**
* Shutdown
*/
public void Dispose() {
SslStream.Close();
if (disconnected != null) {
disconnected(this);
}
}
}
只有 2 个想法有望帮助改进您的代码,但无法回答您最初的问题:
- 您要发送 8 个字节指示以下有效负载长度,但在以下
BitConverter.ToInt32
调用中仅使用其中的 4 个字节,因此 4 个字节就足够了。 - 如果从另一端切断传输会怎样?在我看来,您无法确定您收到的数据是否有效。也许构建类似小型低级协议的东西会有所帮助,例如
4 bytes raw data length
,然后是raw data
本身,然后是some bytes of checksum
(这将允许验证您收到的数据是否已正确传输)。
你读取4或8个字节的方式是错误的。你需要循环直到你真正得到它们。你可能会得到 1.
您在这里和其他地方假设您将阅读所需的数量。您将至少读取一个字节,如果连接被远程端关闭,则读取零字节。
或许,您应该使用 BinaryReader
来抽象掉循环。
此外,您需要清理资源。为什么不将它们包装在 using
中?所有 Close
调用都不安全且不需要。
我也不明白为什么这里需要控制流异常。重构它。