为什么我的 Socket InputStream 会丢失字节?

Why am I losing bytes from my Socket InputStream?

我正在创建一个基于套接字的系统,用于桌面和移动设备之间的通信。我正在使用一个简单的协议来将数据读取和写入设备之间的流,最终以字节结束:

  1. 前4个字节代表一个整数,它定义了要执行的命令类型
  2. 接下来的4个字节代表一个整数,也就是流中其余字节的长度
  3. 剩下的字节代表payload数据,其总数对应(2.)的结果

我能够成功剥离前 4 个字节并解析命令 ok,然后剥离接下来的 4 个字节并正确解析长度。 当我剥离剩余的字节时,问题就来了,其中一些丢失了,并且它们从剩余数据的前面丢失了。

例如;如果命令为 1,长度为 50,则流中应该还剩 50 个字节,但只有 46 个字节,缺少 0-3 字节。

起始数据如下:

将其转换为字节数组后,我得到:

"\u0001[=56=][=56=][=56=][=56=][=56=]C:\Users\dave\Music\Offaiah-Trouble_(Club_Mix).mp3"

(问题 - 为什么第一个整数前面有 \u000 而第二个没有?)

这是我用来解析此代码的一些片段:

IBuffer inbuffer = new Windows.Storage.Streams.Buffer(4);
await _socket.InputStream.ReadAsync(inbuffer, 4, InputStreamOptions.None);
int command = BitConverter.ToInt32(inbuffer.ToArray(), 0);

此时的缓冲区包含:“\u0001[=58=][=58=][=58=]”,BitConverter 将其解析为 1

inbuffer = new Windows.Storage.Streams.Buffer(4);
await _socket.InputStream.ReadAsync(inbuffer, 4, InputStreamOptions.None);
int length = BitConverter.ToInt32(inbuffer.ToArray(), 0);

输入缓冲区现在包含:“2[=59=][=59=][=59=]”,BitConverter 将其解析为“50”

inbuffer = new Windows.Storage.Streams.Buffer((uint)length);
await _socket.InputStream.ReadAsync(inbuffer, (uint)length, InputStreamOptions.Partial);
string path = Encoding.UTF8.GetString(inbuffer.ToArray());

缓冲区现在包含:"sers\dave\Music\Offaiah-Trouble_(Club_Mix).mp3"

前面失踪的"C:\U"去哪儿了?

所以,我意识到我一直被否决,因为这个问题不简洁且不可重现。所以我创建了一个小项目来演示这部分问题,具有讽刺意味的是问题就消失了。

代码如下:

public sealed partial class MainPage : Page
{
    private StreamSocketListener _listener;
    private StreamSocket _client;
    private StreamSocket _socket;
    private HostName _host;
    private int _port = 54321;

    public MainPage()
    {
        InitializeComponent();
        _listener = new StreamSocketListener();
        _listener.ConnectionReceived += async (sender, args) =>
        {
            _socket = args.Socket;
            await Receive();
        };
        _host = NetworkInformation.GetHostNames().FirstOrDefault(x => x.IPInformation != null && x.Type == HostNameType.Ipv4);
    }

    protected async override void OnNavigatedTo(NavigationEventArgs e)
    {
        base.OnNavigatedTo(e);
        await _listener.BindEndpointAsync(_host, $"{_port}");
        await Task.Run(async () =>
        {
            _client = new StreamSocket();
            await _client.ConnectAsync(_host, $"{_port}");

            int command = 1;
            byte[] cmd = BitConverter.GetBytes(command);
            byte[] path = Encoding.UTF8.GetBytes(@"C:\Users\Dave\Music\Offaiah-Trouble_(Club_Mix).mp3");
            byte[] length = BitConverter.GetBytes(path.Length);
            byte[] result = cmd.Concat(length.Concat(path)).ToArray();
            await _client.OutputStream.WriteAsync(result.AsBuffer());
        });
    }

    private async Task Receive()
    {
        while (true)
        {
            IBuffer inbuffer = new Windows.Storage.Streams.Buffer(4);
            await _socket.InputStream.ReadAsync(inbuffer, 4, InputStreamOptions.None);
            int command = BitConverter.ToInt32(inbuffer.ToArray(), 0);
            //The inbuffer at this point contains: "\u0001[=10=][=10=][=10=]", and the BitConverter resolves this to 1

            inbuffer = new Windows.Storage.Streams.Buffer(4);
            await _socket.InputStream.ReadAsync(inbuffer, 4, InputStreamOptions.None);
            int length = BitConverter.ToInt32(inbuffer.ToArray(), 0);
            //The inbuffer now contains: "2[=10=][=10=][=10=]", and the BitConverter resolves this to "50"

            inbuffer = new Windows.Storage.Streams.Buffer((uint)length);
            await _socket.InputStream.ReadAsync(inbuffer, (uint)length, InputStreamOptions.Partial);
            string path = Encoding.UTF8.GetString(inbuffer.ToArray());
        }
    }
}

如果您创建一个新的空白通用项目并运行这个,您会看到它产生了正确的输出。

最终我发现我在这个流 reader 和另一个流之间存在竞争条件。在主项目中,我并没有一次读取所有字节。我有一个单独的方法读取和解析 "command",然后将控制权传递给另一种方法,以将控制重定向到几种工作方法之一来为该特定命令提供服务——去掉长度,然后去掉其余的有效负载。

问题是 'command reader' 然后从流中读取另外 4 个字节,然后工作人员才能读取它的有效负载。

显然答案是它应该在这里暂停并等待工作人员完成,但我一直在使用异步和等待,所以我遇到这个问题让我感到惊讶。原因是缺少 await 和可怕的 async void,如下所示..

违规代码:

private async Task Listen()
{
    while (true)
    {
        //expects a 4 byte packet representing a command
        Debug.WriteLine($"Listening for socket command...");
        IBuffer inbuffer = new Windows.Storage.Streams.Buffer(4);
        await _socket.InputStream.ReadAsync(inbuffer, 4, InputStreamOptions.None);
        int command = BitConverter.ToInt32(inbuffer.ToArray(), 0);

        Debug.WriteLine($"Command received: {command}");
        ParseCommand(command);
    }
}

private async void ParseCommand(int command)
{
    //...
}

...以及修改后的版本:

private async Task Listen()
{
    while (true)
    {
        //expects a 4 byte packet representing a command
        Debug.WriteLine($"Listening for socket command...");
        IBuffer inbuffer = new Windows.Storage.Streams.Buffer(4);
        await _socket.InputStream.ReadAsync(inbuffer, 4, InputStreamOptions.None);
        int command = BitConverter.ToInt32(inbuffer.ToArray(), 0);

        Debug.WriteLine($"Command received: {command}");
        await ParseCommand(command);
    }
}

private async Task ParseCommand(int command)
{
    //...
}