C# TcpClient 通过持久连接读取多条消息

C# TcpClient reading multiple messages over persistent connection

我正在尝试创建一个具有持久连接的 TCP 服务器和客户端,以便服务器和客户端在任何时间点都可以相互通知某些 'events'(所以推送而不是轮询).

我几乎一切正常,客户端可以连接,连接保持打开状态,客户端和服务器都可以写入和读取 tcp 流。

问题出在读取上,我通过首先发送包含消息长度的 8 个字节来定义消息边界。 一旦我收到它,就会读取长度为 x 的消息并引发一个事件。

一切正常,但是一旦消息被读取,我希望 "await stream.ReadAsync" 等待新的传入数据,但它一直循环(和 returns 0 数据)而不是等待导致 100 % cpu 使用率。

有没有办法对流说 'Reset' 以便它像原来那样再次开始等待。

这是我的tcpclient的代码(用于发送和接收),你可以跳到RunListener方法,我认为其他的都不重要。

    public class SSLTcpClient : IDisposable {
        /**
         * Public fields
         */
        public SslStream SslStream { get; private set; }

        /**
         * Events
         */
        public ConnectionHandler connected;
        public ConnectionHandler disconnected;
        public DataTransfer dataReceived;

        /**
         * Constructors
         */
        public SSLTcpClient() { }
        public SSLTcpClient(TcpClient pClient, X509Certificate2 pCert) {
            SslStream = new SslStream(
                pClient.GetStream(),
                false,
                new RemoteCertificateValidationCallback(
                    delegate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
                        return true;
                    }
                ),
                new LocalCertificateSelectionCallback(
                    delegate(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers) {
                        return new X509Certificate2(pCert);
                    }
                )
            );

            try {
                SslStream.AuthenticateAsServer(pCert, true, SslProtocols.Tls, true);
            } catch (AuthenticationException) {
                pClient.Close();
                return;
            }

            Thread objThread = new Thread(new ThreadStart(RunListener));
            objThread.Start();

            if (connected != null) {
                connected(this);
            }
        }

        /**
         * Connect the TcpClient
         */
        public bool ConnectAsync(IPAddress pIP, int pPort, string pX509CertificatePath, string pX509CertificatePassword) {
            TcpClient objClient = new TcpClient();
            try {
                if(!objClient.ConnectAsync(pIP, pPort).Wait(1000)) {
                    throw new Exception("Connect failed");
                };
            } catch (Exception) {
                return false;
            }
            X509Certificate2 clientCertificate;
            X509Certificate2Collection clientCertificatecollection = new X509Certificate2Collection();
            try {
                clientCertificate = new X509Certificate2(pX509CertificatePath, pX509CertificatePassword);
                clientCertificatecollection.Add(clientCertificate);
             } catch(CryptographicException) {
                objClient.Close();
                return false;
            }

            SslStream = new SslStream(
                objClient.GetStream(), 
                false, 
                new RemoteCertificateValidationCallback(
                    delegate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { 
                        return true;
                    }
                ), 
                new LocalCertificateSelectionCallback(
                    delegate(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers) {
                        var cert = new X509Certificate2(pX509CertificatePath, pX509CertificatePassword);
                        return cert;
                    }
                )
            );

            try {
                SslStream.AuthenticateAsClient(pIP.ToString(), clientCertificatecollection, SslProtocols.Tls, false);
            } catch (AuthenticationException) {
                objClient.Close();
                return false;
            }

            Thread objThread = new Thread(new ThreadStart(RunListener));
            objThread.Start();

            if (connected != null) {
                connected(this);
            }
            return true;
        }

        /**
         * Reading
         */
        private async void RunListener() {
            try {
                while (true) {
                    byte[] bytes = new byte[8];
                    await SslStream.ReadAsync(bytes, 0, (int)bytes.Length);

                    int bufLenght = BitConverter.ToInt32(bytes, 0);
                    if (bufLenght > 0) {
                        byte[] buffer = new byte[bufLenght];
                        await SslStream.ReadAsync(buffer, 0, bufLenght);

                        if (dataReceived != null) {
                            dataReceived(this, buffer);
                        }
                    }
                }
            } catch (Exception) {
                Dispose();
            }
        }

        /**
         * Writing 
         */
        public bool Send(byte[] pData) {
            try {
                byte[] lenght = BitConverter.GetBytes(pData.Length);
                Array.Resize(ref lenght, 8);

                SslStream.Write(lenght);
                if (!SslStream.WriteAsync(pData, 0, pData.Length).Wait(1000)) {
                    throw new Exception("Send timed out");
                }
            } catch (Exception) {
                Dispose();
                return false;
            }
            return true;
        }

        public bool Send(string pData) {
            byte[] bytes = System.Text.Encoding.UTF8.GetBytes(pData);
            return Send(bytes);
        }

        /**
         * Shutdown
         */
        public void Dispose() {
            SslStream.Close();
            if (disconnected != null) {
                disconnected(this);
            }
        }
    }

只有 2 个想法有望帮助改进您的代码,但无法回答您最初的问题:

  • 您要发送 8 个字节指示以下有效负载长度,但在以下 BitConverter.ToInt32 调用中仅使用其中的 4 个字节,因此 4 个字节就足够了。
  • 如果从另一端切断传输会怎样?在我看来,您无法确定您收到的数据是否有效。也许构建类似小型低级协议的东西会有所帮助,例如4 bytes raw data length,然后是 raw data 本身,然后是 some bytes of checksum(这将允许验证您收到的数据是否已正确传输)。

你读取4或8个字节的方式是错误的。你需要循环直到你真正得到它们。你可能会得到 1.

您在这里和其他地方假设您将阅读所需的数量。您将至少读取一个字节,如果连接被远程端关闭,则读取零字节。

或许,您应该使用 BinaryReader 来抽象掉循环。

此外,您需要清理资源。为什么不将它们包装在 using 中?所有 Close 调用都不安全且不需要。

我也不明白为什么这里需要控制流异常。重构它。