C++ websocket 服务器处理消息碎片

C++ websocket server handling message fragmentation

我正在尝试通过 websocket 从一个 javascript/html 客户端向另一个客户端发送图像。问题是服务器没有正确接收图像。我将所有图像作为文本中的数据 URI 发送,这样当 javascript 客户端收到它时,它可以简单地将 img 的 src 设置为 URI。问题(我相信)来自我处理消息碎片的方式。发送简单的短信效果很好,所以我相信是消息的大小导致了问题,唯一的主要代码差异是我处理消息碎片的方式。从 this 文档中,我被引导相信所有必须做的就是揭开每个碎片帧的有效负载并将缓冲区连接在一起。服务器上读取的 URI 比图像的实际数据 URI 足够短。在客户端,我所做的就是调用 socket.send() 函数。我已经确认我在 javascript FileReader 中读取的数据 URI 是正确的(在客户端)。

    int wSock::readData(/*input socket data buffer*/ char ** sockp, /*output payload*/ char ** buffer, /*output payload info*/ WebSocketFrameData * data) {
    char * sock = *sockp;
    if (!webSocketIsOpened(sock)) return -32; //checks if the socket is open
    u_long package_size;
    SOCKET socket;
    size_t dataRead = 0;
    size_t dr = 0;
    size_t firstLength = 0;
    memcpy_s(&socket, 4, sock, 4);
    ioctlsocket(socket, FIONREAD, &package_size);
    if (package_size <= 0) return 1;
    char * buf = new char[package_size + 1];
    while (dataRead < package_size) {
        dr = recv(socket, buf + dataRead, package_size - dataRead, NULL);
        if (dr == SOCKET_ERROR) {
            delete[] buf; 
            return WSAGetLastError();
        }
        dataRead += dr;
    }
    *(buf + package_size) = '[=10=]';
    if (package_size > 0) {
        decodeFrame(buf, buffer, &firstLength);
        if (data != NULL) {
            data->payloadLength = firstLength;
            data->opcode = *buf & 0b00001111;
        }
    }
    else return 1;

    // code handling other opcodes such as a close frame or a ping
    
    char fin = (*buf) >> 7;
    if (!fin) { //start handling message fragmentation
        printf("Fragmentation! \n");
        FD_SET tempRead;
        size_t totalLength = firstLength -1; //firstLength includes the null terminator
        char * combinedPayloads = new char[totalLength];
        memcpy_s(combinedPayloads, totalLength, *buffer, totalLength);
        printf("First frage of size: %u \n", totalLength);
        while (fin != 1) {
            FD_ZERO(&tempRead);
            FD_SET(socket, &tempRead);
            select(0, &tempRead, NULL, NULL, NULL);

            package_size = 0;
            ioctlsocket(socket, FIONREAD, &package_size);
            printf("Reading next frag of size: %u \n", package_size);
            char * contBuf = new char[package_size];
            dataRead = 0;
            while (dataRead < package_size) {
                dr = recv(socket, contBuf + dataRead, package_size - dataRead, NULL);
                if (dr == SOCKET_ERROR) { 
                    delete[] contBuf; 
                    return WSAGetLastError();
                }
                dataRead += dr;
            }
            char * payload;
            size_t payloadLength = 0;
            decodeFrame(contBuf, &payload, &payloadLength);
            payloadLength--; //the output payloadLength from the decodeFrame function includes a null terminator
            char * backBuffer = new char[totalLength];
            memcpy_s(backBuffer, totalLength, combinedPayloads, totalLength);
            delete[] combinedPayloads;

            combinedPayloads = new char[totalLength + payloadLength];
            memcpy_s(combinedPayloads, totalLength, backBuffer, totalLength);
            memcpy_s(combinedPayloads + totalLength, payloadLength, payload, payloadLength);
            fin = contBuf[0] >> 7;
            totalLength += payloadLength;
            delete[] backBuffer;
            delete[] contBuf;
            delete[] payload;
            if (fin) break;
        }
        delete[] *buffer;
        *buffer = new char[totalLength + 1];
        memcpy_s(*buffer, totalLength, combinedPayloads, totalLength);
        (*buffer)[totalLength] = '[=10=]';
        delete[] combinedPayloads;
        data->payloadLength = totalLength;
        printf("Finished fragment! Total size: %u \n", totalLength);
    }
    delete[] buf;
    return 0;
}

这是解码每个 websocket 帧的代码。正如我提到的,服务器可以很好地处理较小的聊天消息,所以我认为问题是消息重新组装,但我将包含 decodeFrame 函数,希望它能很好地帮助理解。

    int wSock::decodeFrame(char * message, char ** output, size_t * payloadLength)
{
    char read;
    memcpy_s(&read, 1, message + 1, 1);
    unsigned long long size = read & 0b01111111;
    //takes bits 9 - 15;
    int lastByte = 2;
    if (size == 126) {
        unsigned short rr;
        memcpy_s(&rr, 2, message + 2, 2);
        size = ntohs(rr);
        lastByte = 4;
    }
    else if (size == 127) {
        unsigned long long data;
        memcpy_s(&data, 8, message + 2, 8);
        size = ntohll(data);
        lastByte = 10;
    }
    if(payloadLength != NULL)
        *payloadLength = size + 1;
    char mask[4];
    memcpy_s(mask, 4, message + lastByte, 4);
    *output = new char[(size + 1)];
    lastByte += 4;
    for (int i = 0; i < size; i++) {
        (*output)[i] = message[lastByte + i] ^ mask.mask[i % 4];
    }
    (*output)[size] = '[=11=]';
    return 0;
}

在服务器端进行调试,我把读取的消息写到一个文本文件中。然而,写入的 URI 只有大约 4,000 - 6,000 个字符长,最后 200 - 400 个字符不是有效的 base64 字符,但是这些无效字符 do 之前的字符与其对应的字符匹配在真实数据 URI 上。重新组装过程中的 printf 语句将倾向于读取大约 262,368 个字节(总计),而实际 URI 的长度为 389,906 个字符。读取 URI 后,服务器将其发送给客户端,这会导致它们断开连接。所以正如我提到的,我的猜测是当我重新组装数据帧时出现问题。任何帮助将不胜感激。

ioctlsocket(socket, FIONREAD, &package_size);
  • FIONREAD returns 可以无阻塞读取的字节数。这意味着这行代码后面的 recv() 循环是完全无效的 One recv() will 读取该数据量.不能。

  • 您也没有正确处理流结束(recv() returns 零)。

好的,我明白了。我忘记考虑的是 TCP 消息碎片。正如@EJP 所提到的,ioctlsocket returns 只有 one 单个 recv() 调用中可以读取的字节数。我将收到的每个数据片段都视为自己的 WebSocket 帧,但情况并非总是如此。通常(几乎所有时间)单个 recv() 调用只会读取部分帧,并且第一帧的下一部分将在第二个 recv() 时与第二帧的第一部分一起读取称呼。那么第二个缓冲区(现在是两个不同的不完整帧的混合)显然不会被正确地去除掩码并且解码的大小将是不正确的。 javascript 客户端将每个 WebSocket 帧分段为大约 131K 字节,底层 TCP 层会将这些帧进一步分段为大约 65K 字节的数据包。所以我所做的是在一次 recv() 调用中接收到所有可能的数据,然后使用如下函数:

    unsigned long long wSock::decodeTotalFrameSize(char * frame)
{
    char secondByte = 0;
    memcpy_s(&secondByte, 1, frame + 1, 1);
    unsigned long long size = secondByte & 0b01111111;
    int headerSize = 2 + 4;
    if (size == 126) {
        unsigned short length;
        memcpy_s(&length, 2, frame + 2, 2);
        size = ntohs(length);
        headerSize += 2;
    }
    else if (size == 127) {
        unsigned long long length;
        memcpy_s(&length, 8, frame + 2, 8);
        size = ntohll(length);
        headerSize += 8;
    }
    return size + headerSize;
}

获取总的 WebSocket 帧大小。然后循环,直到您将那个字节数读入一个帧。类似于:

            FD_ZERO(&tempRead);
            FD_SET(socket, &tempRead);
            select(0, &tempRead, NULL, NULL, NULL);

            package_size = 0;
            ioctlsocket(socket, FIONREAD, &package_size);
            char * contBuf = new char[package_size];
            dataRead = 0;
            dr = recv(socket, contBuf, package_size, NULL);
            if (dr == SOCKET_ERROR) { 
                delete[] contBuf; 
                return WSAGetLastError();
            }
            unsigned long long actualSize = decodeTotalFrameSize(contBuf);
            if (package_size < actualSize) {
                char * backBuffer = new char[package_size];
                memcpy_s(backBuffer, package_size, contBuf, package_size);
                delete[] contBuf;
                contBuf = new char[actualSize];
                memcpy_s(contBuf, actualSize, backBuffer, package_size);
                delete[] backBuffer;
                dataRead = package_size;
                dr = 0;
                while (dataRead < actualSize) {
                    dr = recv(socket, contBuf + dataRead, actualSize - dataRead, NULL);
                    if (dr == SOCKET_ERROR) {
                        delete[] contBuf;
                        return WSAGetLastError();
                    }
                    else if (dr == 0) break;
                    dataRead += dr;
                }
                printf("Read total frag of %u \n", dataRead);

            }