StreamSocket C# 客户端只能向服务器写入一次

StreamSocket C# Client is only able to write one time to server

我一直在尝试使用 C# 中的 StreamSockets 设置客户端-服务器应用程序。我能够最初连接到服务器 (ConnectAsync) 并随后写入和读取流。如果客户端使用 WriteToServer 方法向服务器发送另一个流,则不会触发服务器端的事件 (SocketListener_ConnectionReceived)。我正在向服务器发送一个 xmlSerialized 对象 "Message"。

调试时我没有收到任何错误。在客户端,虽然在使用 "WriteToServer" 并前进到 "ReadFromServer" 之后,代码显然卡在了下面的行中,因为服务器没有回复

            int bufferLength = inStream.ReadByte();

希望有人来电帮忙。老实说,我不确定问题出在哪里,因为 "Write"-方法在客户端两次尝试写入服务器时都以相同的方式使用。

客户端是一台Windows 10台电脑,服务器是一台Raspberry pi 运行 Windows 10台物联网。

处理连接的客户端应用程序中的 class 如下所示。

StreamSocket socket;
    HostName remoteHostName, localHostName;
    string serviceAddress = "1337";
    EndpointPair endPointPair;
    Boolean isConnected;
    Socket test;


    public Connection()
    {
        remoteHostName = new HostName("172.16.20.202"); // might have to change based on pi's ip address
        //remoteHostName = new HostName("172.16.20.4");
        localHostName = new HostName(getLocalIpAddress());

        endPointPair = new EndpointPair(localHostName, serviceAddress, remoteHostName, serviceAddress);

        socket = new StreamSocket();
        socket.Control.NoDelay = true;
        socket.Control.QualityOfService = SocketQualityOfService.LowLatency;


    }

    private string getLocalIpAddress()
    {
        var icp = NetworkInformation.GetInternetConnectionProfile();

        if (icp?.NetworkAdapter == null) return null;
        var hostname =
            NetworkInformation.GetHostNames()
                .SingleOrDefault(
                    hn =>
                        hn.IPInformation?.NetworkAdapter != null && hn.IPInformation.NetworkAdapter.NetworkAdapterId
                        == icp.NetworkAdapter.NetworkAdapterId);

        // the ip address
        return hostname?.CanonicalName;
    }

    public async Task StartConnection()
    {
        try
        {
            if (isConnected)
            {
                await socket.CancelIOAsync();
                socket.Dispose();
                socket = null;
                isConnected = false;
            }
            await socket.ConnectAsync(endPointPair);
            isConnected = true;
        }
        catch (Exception exc)
        {
            if (Windows.Networking.Sockets.SocketError.GetStatus(exc.HResult) == SocketErrorStatus.Unknown)
            {
                throw;
            }
            Debug.WriteLine("Connect failed with error: " + exc.Message);

            socket.Dispose();
            isConnected = false;
            socket = null;
            //return null;
        }
    }

    public async Task WriteToServer(Message msg)
    {
        try
        {
            using (DataWriter writer = new DataWriter(socket.OutputStream))
            {
                writer.WriteBytes(serialize(msg));
                await writer.StoreAsync();
                writer.DetachStream();
                writer.Dispose();
            }
        }
        catch (Exception exc)
        {
            Debug.WriteLine("Write failed with error: " + exc.Message);
        }

    }
    public async Task<Library.Message> ReadFromServer()
    {
        try
        {
            Stream inStream = socket.InputStream.AsStreamForRead();
            int bufferLength = inStream.ReadByte();
            byte[] serializedMessage = new byte[bufferLength];

            await inStream.ReadAsync(serializedMessage, 0, bufferLength);
            await inStream.FlushAsync();

            Library.Message incomingMessage;
            using (var stream = new MemoryStream(serializedMessage))
            {
                var serializer = new XmlSerializer(typeof(Library.Message));
                incomingMessage = (Library.Message)serializer.Deserialize(stream);
            }
            return incomingMessage;
        }
        catch (Exception exc)
        {
            Debug.WriteLine("Read failed with error: " + exc.Message);
            return null;
        }
    }

    private byte[] serialize(Message msg)
    {
        byte[] serializedMessage, returnArray;
        var serializer = new XmlSerializer(typeof(Library.Message));

        using (var stream = new MemoryStream())
        {
            serializer.Serialize(stream, msg);
            serializedMessage = stream.ToArray();
            stream.Dispose();
        }

        int bufferLength = serializedMessage.Length;

        returnArray = new byte[serializedMessage.Length + 1];
        serializedMessage.CopyTo(returnArray, 1);
        returnArray[0] = (byte)bufferLength;

        Debug.WriteLine("ClientReturnArrayLength: " + returnArray.Length);
        Debug.WriteLine("ClientFirstByte: " + returnArray[0]);
        return returnArray;
    }

服务器端看起来像这样

    public Task DispatcherPriority { get; private set; }

    public MainPage()
    {
        this.InitializeComponent();
        dispatcher = CoreWindow.GetForCurrentThread().Dispatcher;
        hostName = new HostName(getLocalIpAddress());
        clients = new List<Client>();
    }

    /// <summary>
    /// Gets the ip address of the host
    /// </summary>
    /// <returns></returns>
    private string getLocalIpAddress()
    {
        var icp = NetworkInformation.GetInternetConnectionProfile();

        if (icp?.NetworkAdapter == null) return null;
        var hostname =
            NetworkInformation.GetHostNames()
                .SingleOrDefault(
                    hn =>
                        hn.IPInformation?.NetworkAdapter != null && hn.IPInformation.NetworkAdapter.NetworkAdapterId
                        == icp.NetworkAdapter.NetworkAdapterId);

        // the ip address
        return hostname?.CanonicalName;
    }


    async void setupSocketListener()
    {

        if (socketListener != null)
        {
            await socketListener.CancelIOAsync();
            socketListener.Dispose();
            socketListener = null;
        }
        socketListener = new StreamSocketListener();
        socketListener.Control.QualityOfService = SocketQualityOfService.LowLatency;
        socketListener.ConnectionReceived += SocketListener_ConnectionReceived;
        await socketListener.BindServiceNameAsync("1337");
        listBox.Items.Add("server started.");

        clients.Clear();
    }

    private async void SocketListener_ConnectionReceived(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
    {
        HostName ip = args.Socket.Information.RemoteAddress;
        string port = args.Socket.Information.RemotePort;

        try
        {
            Stream inStream = args.Socket.InputStream.AsStreamForRead();
            int bufferLength = inStream.ReadByte();
            byte[] serializedMessage = new byte[bufferLength];

            await inStream.ReadAsync(serializedMessage, 0, bufferLength);
            await inStream.FlushAsync();

            Message incomingMessage;
            using (var stream = new MemoryStream(serializedMessage))
            {
                var serializer = new XmlSerializer(typeof(Message));
                incomingMessage = (Message)serializer.Deserialize(stream);
            }

            /// <summary>
            /// 1 = Connected
            /// 2 = SentNote
            /// 3 = Login
            /// </summary>
            switch (incomingMessage.Mode)
            {
                case 1:
                    onClientConnect(ip, port, incomingMessage.Username, args.Socket);
                    break;
                case 2:
                    onNoteReceived(incomingMessage);
                    break;
                case 3:
                    //handle login
                    break;
            }
        }
        catch (Exception msg)
        {
            Debug.WriteLine(msg);
        }

    }

    private async void onNoteReceived(Message msg)
    {
        foreach (var client in clients)
        {
            //if (client.Username != msg.Username)
            //{
            using (DataWriter writer = new DataWriter(client.Socket.OutputStream))
            {
                writer.WriteBytes(serialize(msg));
                await writer.StoreAsync();
                writer.DetachStream();
                writer.Dispose();
            }
            //}
        }
    }

    private void buttonStartServer_Click(object sender, RoutedEventArgs e)
    {
        setupSocketListener();
    }


    private async void notifyClients(string username)
    {
        Message msg = new Message();
        msg.Username = username;

        foreach (var client in clients)
        {
            //if (client.Username != msg.Username)
            //{
            using (DataWriter writer = new DataWriter(client.Socket.OutputStream))
            {
                writer.WriteBytes(serialize(msg));
                await writer.StoreAsync();
                writer.DetachStream();
                writer.Dispose();
            }
            //}
        }
    }

    private async void onClientConnect(HostName ip, string port, string username, StreamSocket socket)
    {
        clients.Add(new Client(ip, port, username, socket));

        await dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
        {
            listBox.Items.Add("User: " + username + " on IP " + ip + " is connected.");
        });

        notifyClients(username);
    }


    private byte[] serialize(Message msg)
    {
        byte[] serializedMessage, returnArray;
        var serializer = new XmlSerializer(typeof(Message));

        using (var stream = new MemoryStream())
        {
            serializer.Serialize(stream, msg);
            serializedMessage = stream.ToArray();
            stream.Dispose();
        }

        int bufferLength = serializedMessage.Length;

        returnArray = new byte[serializedMessage.Length + 1];
        serializedMessage.CopyTo(returnArray, 1);
        returnArray[0] = (byte)bufferLength;

        Debug.WriteLine("ClientReturnArrayLength: " + returnArray.Length);
        Debug.WriteLine("ClientFirstByte: " + returnArray[0]);
        return returnArray;
    }


}

这是我用来通过网络发送的消息 class。

    public string Username { get; set; }

    /// <summary>
    /// 1 = startConnection
    /// 2 = SendNote
    /// 3 = Login
    /// </summary>
    public int Mode { get; set; }
    public Piano PianoNote { get; set; }
    public string Instrument { get; set; }

    public Message()
    {

    }

}

public enum Piano { a1, a1s, b1, c1 };

**编辑:**

消息框架:

byte[] prefix = BitConverter.GetBytes(serializedMessage.Length);
returnArray = new byte[serializedMessage.Length + prefix.Length];
prefix.CopyTo(returnArray, 0);
serializedMessage.CopyTo(returnArray, prefix.Length);

正在阅读消息:

byte[] prefix = new byte[4];
await inStream.ReadAsync(prefix, 0, 4);
int bufferLength = BitConverter.ToInt32(prefix, 0);

半开: 我没有读取同步,而是切换为异步读取前 4 个字节,如上所示。

I am able to initially connect to the server (ConnectAsync) and following to write and read the stream. If the client sends another stream to the server using the method WriteToServer the event on the server side is not being triggered (SocketListener_ConnectionReceived).

好好看看这些名字。您调用 ConnectAsync 一次,然后调用 WriteToServer 两次,并且只看到 SocketListener_ConnectionReceived 一次。只有一次连接,所以是的,ConnectionReceived只会触发一次。

不过,这只是冰山一角。此代码还有其他一些非常微妙的问题。

正如我在我的博客中所描述的那样,一个突出的问题是缺少适当的 message framing。您使用的是单字节长度前缀,因此在线上它没问题(尽管限制为 256 个字节,这与 XML 相距不远)。但是消息的阅读是不正确的;特别是,Stream.ReadAsync 可能读取 1bufferLength 字节之间,或者它可能 return 0.

另一个问题是它受制于 half-open problem,正如我在我的博客中所描述的那样。特别是,int bufferLength = inStream.ReadByte(); 将在半开情况下无限期阻塞。您应该只对所有网络流使用异步方法,并在等待数据到达时定期写入

总而言之,我强烈建议您使用self-hosted SignalR而不是原始套接字。原始套接字编程极难正确执行,尤其是因为不正确的代码经常碰巧在本地环境中正常工作。