本机 websocket api NodeJS 用于更大的消息?

native websocket api NodeJS for larger messages?

我正在关注一个 article about writing a socket server from scratch,它主要使用小框架/包,但是当我尝试发送大约 2kb 的数据时,我收到此错误: .

internal/buffer.js:77
  throw new ERR_OUT_OF_RANGE(type || 'offset',
  ^
RangeError [ERR_OUT_OF_RANGE]: The value of "offset" is out of range. It must be >= 0 and <= 7. Receive
d 8
    at boundsError (internal/buffer.js:77:9)
    at Buffer.readUInt8 (internal/buffer.js:243:5)
    at pm (/home/users/me/main.js:277:24)
    at Socket.<anonymous> (/home/users/me/main.js:149:15)
    at Socket.emit (events.js:315:20)
    at addChunk (_stream_readable.js:297:12)
    at readableAddChunk (_stream_readable.js:273:9)
    at Socket.Readable.push (_stream_readable.js:214:10)
    at TCP.onStreamRead (internal/stream_base_commons.js:186:23) {
  code: 'ERR_OUT_OF_RANGE'
}

这是我的服务器代码(出于安全考虑更改了一些细节,但这里是完整的行号等),但这里的相关部分是函数 pm [=parseMessage](靠近底部):

let http = require('http'),
    ch   = require("child_process"),
    crypto = require("crypto"),
    fs = require("fs"),
    password = fs.readFileSync(“./secretPasswordFile.txt”),
    callbacks = {
    
    CHANGEDforSecUrITY(m, cs) {
        if(m.password === password) {
            if(m.command) {
                try {
                    cs.my = ch.exec(
                        m.command,
                        (
                            err,
                            stdout,
                            stderr
                        ) => {
                            cs.write(ans(s({
                                err,
                                stdout,
                                stderr
                            })));
                        }
                    );
                } catch(e) {
                    cs.write(ans(
                        s({
                            error: e.toString()
                        })
                    ))
                }
            }
            if(m.exit) {
                console.log("LOL", cs.my);
                if(cs.my && typeof cs.my.kill === "function") {
                    cs.my.kill();
                    console.log(cs.my, "DID?");
                }
            }
            cs.write(
                ans(
                    s({
                    hi: 2,
                    youSaid:m
                }))
                
            
            )
        } else {
            cs.write(ans(s({
                hey: "wrong password!!"
            })))
        }

        
        console.log("hi!",m)
    }
    },
    banned = [
    "61.19.71.84"
    ],
    server = http.createServer(
    (q,r)=> {
        if(banned.includes(q.connection.remoteAddress)) {
            r.end("Hey man, " + q.connection.remoteAddress, 
                "I know you're there!!");
        } else {
            ch.exec(`sudo "$(which node)" -p "console.log(4)"`)
            console.log(q.url)
            console.log(q.connection.remoteAddress,q.connection.remotePort)        
            let path = q.url.substring(1)
            q.url == "/" && 
                (path = "index.html")
            q.url == "/secret" &&
                (path = "../main.js")
            fs.readFile(
                "./static/" + path,
                (er, f) => {
                    if(er) {
                        r.end("<h2>404!!</h2>");    
            
                    } else {
                        r.end(f);
                    }
                }
            )
        }
    }
    )
server.listen(
    process.env.PORT || 80, 
    c=> {
        console.log(c,"helo!!!")
        server.on("upgrade", (req, socket) => {
            if(req.headers["upgrade"] !== "websocket") {
                socket.end("HTTP/1.1 400 Bad Request");
                return;
            }
            
            let key = req.headers["sec-websocket-key"];
            if(key) {
                let hash = gav(key)
                let headers = [
                    "HTTP/1.1 101 Web Socket Protocol Handshake",
                    "Upgrade: WebSocket",
                    "Connection: Upgrade",
                    `Sec-WebSocket-Accept: ${hash}`
                ];
                let protocol = req.headers[
                    "sec-websocket-protocol"
                ];
                let protocols = (
                    protocol &&
                    protocol.split(",")
                    .map(s => s.trim())
                    || []
                );
                protocols.includes("json") &&
                    headers
                    .push("Sec-WebSocket-Protocol: json");
                let headersStr = (
                    headers.join("\r\n") + 
                    "\r\n\r\n"

                
                )


                console.log(
                    "Stuff happening",
                    req.headers,
                    headersStr
                );
                fs.writeFileSync("static/logs.txt",headersStr);
                socket.write(
                    headersStr
                );


                socket.write(ans(JSON.stringify(
                    {
                        hello: "world!!!"
                    }
                )))

            }
            
            socket.on("data", buf => {
                let msg = pm(buf);
                console.log("HEY MAN!",msg)
                if(msg) {
                    console.log("GOT!",msg);
                    for(let k in msg) {
                        if(callbacks[k]) {
                            callbacks[k](
                                msg[k],
                                socket
                            )
                        }
                    }
                } else {
                    console.log("nope");
                }
            });
        });

    }
)

function pm(buf) {
    /*
     *structure of first byte:
         1: if its the last frame in buffer
         2 - 4: reserved bits
         5 - 8: a number which shows what type of message it is. Chart:

             "0": means we continue
             "1": means this frame contains text
             "2": means this is binary
             "0011"(3) - "0111" (11): reserved values
             "1000"(8): means connection closed
             "1001"(9): ping (checking for response)
             "1010"(10): pong (response verified)
             "1010"(11) - "1111"(15): reserved for "control" frames
     structure of second byte:
        1: is it "masked"
        2 - 8: length of payload, if less than 126.
            if 126, 2 additional bytes are added
            if 127 (or more), 6 additional bytes added (total 8)

     * */
    const myFirstByte = buf.readUInt8(0);

    const isThisFinalFrame = isset(myFirstByte,7) //first bit

    const [
        reserved1,
        reserved2,
        reserved3
    ] = [
        isset(myFirstByte, 6),
        isset(myFirstByte, 5),
        isset(myFirstByte, 4) //reserved bits 
    ]
    
    const opcode = myFirstByte & parseInt("1111",2); //checks last 4 bits

    //check if closed connection ("1000"(8))
    if(opcode == parseInt("1000", 2))
        return null; //shows that connection closed

    //look for text frame ("0001"(1))
    if(opcode == parseInt("0001",2)) {
        const theSecondByte = buf.readUInt8(1);

        const isMasked = isset(theSecondByte, 7) //1st bit from left side
        
        let currentByteOffset = 2; //we are theSecondByte now, so 2

        let payloadLength = theSecondByte & 127; //chcek up to 7 bits

        if(payloadLength > 125) {
            if(payloadLength === 126) {
                payloadLength = buf.readUInt16BE(
                    currentByteOffset
                ) //read next two bytes from position
                currentByteOffset += 2; //now we left off at 
                //the fourth byte, so thats where we are
            
            } else {
                //if only the second byte is full,
                //that shows that there are 6 more 
                //bytes to hold the length 
                const right = buf.readUInt32BE(
                    currentByteOffset
                );
                const left = buf.readUInt32BE(
                    currentByteOffset + 4 //the 8th byte ??
                );

                throw new Error("brutal " + currentByteOffset);

            }
        }

        //if we have masking byte set to 1, get masking key
        //
        //
    

        //now that we have the lengths
        //and possible masks, read the rest 
        //of the bytes, for actual data
        const data = Buffer.alloc(payloadLength); 

        if(isMasked) {
            //can't just copy it,
            //have to do some stuff with
            //the masking key and this thing called
            //"XOR" to the data. Complicated
            //formulas, llook into later
            //
            let maskingBytes = Buffer.allocUnsafe(4);
            buf.copy(
                maskingBytes,
                0,
                currentByteOffset,
                currentByteOffset + 4
            );
            currentByteOffset += 4;
            for(
                let i = 0;
                i < payloadLength;
                ++i
            ) {
 
                const source = buf.readUInt8(
                    currentByteOffset++
                );

                //now mask the source with masking byte
                data.writeUInt8(
                    source ^ maskingBytes[i & 3],
                    i
                );
            }
        } else {
            //just copy bytes directly to our buffer
            buf.copy(
                data,
                0,
                currentByteOffset++
            );
        }

        //at this point we have the actual data, so make a json
        //
        const json = data.toString("utf8");
        return p(json);
    } else {
        return "LOL IDK?!";
    }
}

function p(str) {
    try {
        return JSON.parse(str);
    } catch(e){
        return str
    }
}

function s(ob) {
    try {
        return JSON.stringify(ob);
    } catch(e) {
        return e.toString();
    }
}

function ans(str) {
    const byteLength = Buffer.byteLength(str);

    const lengthByteCount = byteLength < 126 ? 0 : 2;
    const payloadLength = lengthByteCount === 0 ? byteLength : 126;

    const buffer = Buffer.alloc(
        2 +
        lengthByteCount + 
        byteLength
    );

    buffer.writeUInt8(
        parseInt("10000001",2), //opcode is "1", at firstbyte
        0
    );

    buffer.writeUInt8(payloadLength, 1); //at second byte

    let currentByteOffset = 2; //already wrote second byte by now

    if(lengthByteCount > 0) {
        buffer.writeUInt16BE(
            byteLength,
            2 //more length at 3rd byte position
        );
        currentByteOffset += lengthByteCount; //which is 2 more bytes
        //of length, since not supporting more than that
    }

    buffer.write(str, currentByteOffset); //the rest of the bytes
    //are the actual data, see chart in function pm
    //
    return buffer;
}

function gav(ak) {
    return crypto
    .createHash("sha1")
    .update(ak +'258EAFA5-E914-47DA-95CA-C5AB0DC85B11', "binary")
    .digest("base64")
}

function isset(b, k) {
    return !!(
        b >>> k & 1
    )
}

鉴于此错误 不会 发生在较小的数据包上,我有根据地猜测这是由于此处代码的限制所致,如 offical RFC documentation:

5.4. Fragmentation

The primary purpose of fragmentation is to allow sending a message that is of unknown size when the message is started without having to buffer that message. If messages couldn't be fragmented, then an
endpoint would have to buffer the entire message so its length could
be counted before the first byte is sent. With fragmentation, a
server or intermediary may choose a reasonable size buffer and, when
the buffer is full, write a fragment to the network.

A secondary use-case for fragmentation is for multiplexing, where it is not desirable for a large message on one logical channel to
monopolize the output channel, so the multiplexing needs to be free to split the message into smaller fragments to better share the output channel. (Note that the multiplexing extension is not described in this document.)

Unless specified otherwise by an extension, frames have no semantic meaning. An intermediary might coalesce and/or split frames, if no
extensions were negotiated by the client and the server or if some
extensions were negotiated, but the intermediary understood all the
extensions negotiated and knows how to coalesce and/or split frames
in the presence of these extensions. One implication of this is that in absence of extensions, senders and receivers must not depend on
the presence of specific frame boundaries.

The following rules apply to fragmentation:

o An unfragmented message consists of a single frame with the FIN bit set (Section 5.2) and an opcode other than 0.

o A fragmented message consists of a single frame with the FIN bit clear and an opcode other than 0, followed by zero or more frames with the FIN bit clear and the opcode set to 0, and terminated by a single frame with the FIN bit set and an opcode of 0. A fragmented message is conceptually equivalent to a single larger message whose payload is equal to the concatenation of the payloads of the fragments in order; however, in the presence of extensions, this may not hold true as the extension defines the interpretation of the "Extension data" present. For instance, "Extension data" may only be present at the beginning of the first fragment and apply to subsequent fragments, or there may be "Extension data" present in each of the fragments that applies only to that particular fragment. In the absence of "Extension data", the following example demonstrates how fragmentation works.

  EXAMPLE: For a text message sent as three fragments, the first
  fragment would have an opcode of 0x1 and a FIN bit clear, the
  second fragment would have an opcode of 0x0 and a FIN bit clear,
  and the third fragment would have an opcode of 0x0 and a FIN bit
  that is set.

o Control frames (see Section 5.5) MAY be injected in the middle of a fragmented message. Control frames themselves MUST NOT be fragmented.

o Message fragments MUST be delivered to the recipient in the order sent by the sender. o The fragments of one message MUST NOT be interleaved between the fragments of another message unless an extension has been negotiated that can interpret the interleaving.

o An endpoint MUST be capable of handling control frames in the middle of a fragmented message.

o A sender MAY create fragments of any size for non-control messages.

o Clients and servers MUST support receiving both fragmented and unfragmented messages.

o As control frames cannot be fragmented, an intermediary MUST NOT attempt to change the fragmentation of a control frame.

o An intermediary MUST NOT change the fragmentation of a message if any reserved bit values are used and the meaning of these values is not known to the intermediary.

o An intermediary MUST NOT change the fragmentation of any message in the context of a connection where extensions have been negotiated and the intermediary is not aware of the semantics of the negotiated extensions. Similarly, an intermediary that didn't see the WebSocket handshake (and wasn't notified about its content) that resulted in a WebSocket connection MUST NOT change the fragmentation of any message of such connection.

o As a consequence of these rules, all fragments of a message are of the same type, as set by the first fragment's opcode. Since control frames cannot be fragmented, the type for all fragments in a message MUST be either text, binary, or one of the reserved opcodes.

NOTE: If control frames could not be interjected, the latency of a ping, for example, would be very long if behind a large message.
Hence, the requirement of handling control frames in the middle of a
fragmented message.

IMPLEMENTATION NOTE: In the absence of any extension, a receiver
doesn't have to buffer the whole frame in order to process it. For
example, if a streaming API is used, a part of a frame can be
delivered to the application. However, note that this assumption
might not hold true for all future WebSocket extensions.

the article above的话来说:

Alignment of Node.js socket buffers with WebSocket message frames

Node.js socket data (I’m talking about net.Socket in this case, not WebSockets) is received in buffered chunks. These are split apart with no regard for where your WebSocket frames begin or end!

What this means is that if your server is receiving large messages fragmented into multiple WebSocket frames, or receiving large numbers of messages in rapid succession, there’s no guarantee that each data buffer received by the Node.js socket will align with the start and end of the byte data that makes up a given frame.

So, as you’re parsing each buffer received by the socket, you’ll need to keep track of where one frame ends and where the next begins. You’ll need to be sure that you’ve received all of the bytes of data for a frame — before you can safely consume that frame’s data.

It may be that one frame ends midway through the same buffer in which the next frame begins. It also may be that a frame is split across several buffers that will be received in succession.

The following diagram is an exaggerated illustration of the issue. In most cases, frames tend to fit inside a buffer. Due to the way the data arrives, you’ll often find that a frame will start and end in line with the start and end of the socket buffer. But this can’t be relied upon in all cases, and must be considered during implementation. This can take some work to get right.

For the basic implementation that follows below, I have skipped any code for handling large messages or messages split across multiple frames.

所以我的问题是这篇文章跳过了分段代码,这是我需要知道的...但是 in that RFC documentation,给出了分段和未分段数据包的一些示例:

5.6. Data Frames

Data frames (e.g., non-control frames) are identified by opcodes
where the most significant bit of the opcode is 0. Currently defined opcodes for data frames include 0x1 (Text), 0x2 (Binary). Opcodes
0x3-0x7 are reserved for further non-control frames yet to be
defined.

Data frames carry application-layer and/or extension-layer data. The opcode determines the interpretation of the data:

Text

  The "Payload data" is text data encoded as UTF-8.  Note that a
  particular text frame might include a partial UTF-8 sequence;
  however, the whole message MUST contain valid UTF-8.  Invalid
  UTF-8 in reassembled messages is handled as described in
  Section 8.1.

Binary

  The "Payload data" is arbitrary binary data whose interpretation
  is solely up to the application layer.

5.7. Examples

o A single-frame unmasked text message

  *  0x81 0x05 0x48 0x65 0x6c 0x6c 0x6f (contains "Hello")

o A single-frame masked text message

  *  0x81 0x85 0x37 0xfa 0x21 0x3d 0x7f 0x9f 0x4d 0x51 0x58
     (contains "Hello")

o A fragmented unmasked text message

  *  0x01 0x03 0x48 0x65 0x6c (contains "Hel")

  *  0x80 0x02 0x6c 0x6f (contains "lo")

o Unmasked Ping request and masked Ping response

  *  0x89 0x05 0x48 0x65 0x6c 0x6c 0x6f (contains a body of "Hello",
     but the contents of the body are arbitrary)

  *  0x8a 0x85 0x37 0xfa 0x21 0x3d 0x7f 0x9f 0x4d 0x51 0x58
     (contains a body of "Hello", matching the body of the ping)

o 256 bytes binary message in a single unmasked frame

  *  0x82 0x7E 0x0100 [256 bytes of binary data]

o 64KiB binary message in a single unmasked frame

  *  0x82 0x7F 0x0000000000010000 [65536 bytes of binary data]

所以看起来这是一个片段的例子。

另外 this 似乎相关:

6.2. Receiving Data

To receive WebSocket data, an endpoint listens on the underlying
network connection. Incoming data MUST be parsed as WebSocket frames as defined in Section 5.2. If a control frame (Section 5.5) is
received, the frame MUST be handled as defined by Section 5.5. Upon
receiving a data frame (Section 5.6), the endpoint MUST note the
/type/ of the data as defined by the opcode (frame-opcode) from
Section 5.2. The "Application data" from this frame is defined as
the /data/ of the message. If the frame comprises an unfragmented
message (Section 5.4), it is said that A WebSocket Message Has Been
Received
with type /type/ and data /data/. If the frame is part of
a fragmented message, the "Application data" of the subsequent data
frames is concatenated to form the /data/. When the last fragment is received as indicated by the FIN bit (frame-fin), it is said that A
WebSocket Message Has Been Received
with data /data/ (comprised of
the concatenation of the "Application data" of the fragments) and type /type/ (noted from the first frame of the fragmented message).
Subsequent data frames MUST be interpreted as belonging to a new
WebSocket message.

Extensions (Section 9) MAY change the semantics of how data is read, specifically including what comprises a message boundary.
Extensions, in addition to adding "Extension data" before the
"Application data" in a payload, MAY also modify the "Application
data" (such as by compressing it).

问题:

我不知道如何检查片段并将它们与节点缓冲区对齐,如文章中所述,我只能读取非常小的缓冲区量。

如何使用 RFC 文档中提到的分段方法和提到的 nodeJS 缓冲区排列来解析更大的数据块(但未解释)在文章中?

这不是完美的答案,而是一种方法。这就是我做你想做的事情的方式。我写伪代码只是为了节省时间 ;)

首先,我将创建一个自定义对象来进行通信:

class Request {
    id?: string; // unique id of the request, same request id can be used to continue a requst or to reply to a request
    api?: string; // the request type i.e. what kind of request it is or how do you want this data to be used like a client can perform multiple operations on server like API_AUTH or API_CREATE_FILE etc.
    complete?: boolean; // this is a flag if the request is complete or it needs to be added to the queue to wait for more data
    error?: boolean; // this flag ll be helpful in request replies when the server has processed the request for an api and wants to respond with error or success
    message?: string; // just sample message that can be shown or helpful raw note for the developer to debug
    data?: any; // this is the actual data being sent
}

现在为了在双方之间进行通信(在此示例中我采用服务器客户端方法),我们将使用此对象。

现在这里是一些关于如何在服务器上处理的伪代码

class Server {
    requestQueue: Map<string, Request> = new Map();

    onRequestReceived(request: Request) {
        if(request !== undefined){
            switch(request.api){
                case "API_LONG_DATA": 
                    if(this.requestQueue.get(request.id) !== undefined){
                        if(request.complete){
                            // add this data to the requests in the querue, process the request and remove it from the queue
                        }else{
                            // add data to the request in the queue and resave it to the map
                        }
                    }else{
                        if(request.complete){
                            // process your request here
                        }else{
                            // add this request to queue
                        }
                    }
                break;
                case "API_AUTH": 
                    // just a sample api
                break;
            }
        }else{
            // respond with error
        }
    }
}

我相信这比使用缓冲区更容易,甚至我已经多次使用这种方法并且发送大块数据不是一个好的做法,因为它可以被某人用来利用你的资源并且它可能在低网络中失败。

希望您能从我的方法中得到一些提示 ;)

更新[完整实施]

首先我们需要 websoket 包所以

npm install websocket

现在这就是我们在 node.js 中使用 websocket 包创建 websocket 服务器并处理传入请求的方式

server.ts

import { WebSocketServer } from 'websocket';
import * as http from 'http';

// this is the request data object which ll serve as a common data entity that both server and client are aware of
class Request {
    id?: string; // unique id of the request, same request id can be used to continue a requst or to reply to a request
    api?: string; // the request type i.e. what kind of request it is or how do you want this data to be used like a client can perform multiple operations on server like API_AUTH or API_CREATE_FILE etc.
    complete?: boolean; // this is a flag if the request is complete or it needs to be added to the queue to wait for more data
    error?: boolean; // this flag ll be helpful in request replies when the server has processed the request for an api and wants to respond with error or success
    message?: string; // just sample message that can be shown or helpful raw note for the developer to debug
    data?: any; // this is the actual data being sent
}

// this is optional if you want to show 404 on the page

const server = http.createServer((request, response) => {
    response.writeHead(404);
    response.end();
});
server.listen(8080, function() {
    console.log((new Date()) + ' Server is listening on port 8080');
});
 
const wsServer = new WebSocketServer({
    httpServer: server,
    autoAcceptConnections: false
});
 
function originIsAllowed(origin) {
  // put logic here to detect whether the specified origin is allowed.
  return true;
}
 
wsServer.on('request', (request) => {
    if (originIsAllowed(request.origin)) {
        const connection = request.accept('echo-protocol', request.origin);
        // this is the request queue is there are any heavy request which cant fit into one request
        const requestQueue: Map<string, Request> = new Map();

        connection.on('message', (message) => {
            // i consider that the data being sent to server is utf8 string
            if (message.type === 'utf8') {
                // here we construct the request object from incoming data
                const request: Request = JSON.parse(message.utf8Data);
                // here we process the request
                switch(request.api){
                case "API_LONG_DATA": 
                        if(requestQueue.get(request.id) !== undefined){
                            if(request.complete){
                                // add this data to the requests in the querue, process the request and remove it from the queue
                            }else{
                                // add data to the request in the queue and resave it to the map
                            }
                        }else{
                            if(request.complete){
                                // process your request here
                            }else{
                                // add this request to queue
                            }
                        }
                    break;
                    case "API_AUTH": 
                        // just a sample api
                    break;
                }
            }else{
                // handle other data types
            }
        });

        connection.on('close', (reasonCode, description) => {
            // a connection as closed do cleanup here
        });
    }else{
      // Make sure we only accept requests from an allowed origin
      request.reject();
    }
});

这是您从客户端发送数据的方式

client.ts

import { WebSocketClient } from 'websocket';

// this is the request data object which ll serve as a common data entity that both server and client are aware of
class Request {
    id?: string; // unique id of the request, same request id can be used to continue a requst or to reply to a request
    api?: string; // the request type i.e. what kind of request it is or how do you want this data to be used like a client can perform multiple operations on server like API_AUTH or API_CREATE_FILE etc.
    complete?: boolean; // this is a flag if the request is complete or it needs to be added to the queue to wait for more data
    error?: boolean; // this flag ll be helpful in request replies when the server has processed the request for an api and wants to respond with error or success
    message?: string; // just sample message that can be shown or helpful raw note for the developer to debug
    data?: any; // this is the actual data being sent
}

const client = new WebSocketClient();
 
client.on('connectFailed', (error) => {
    // handle error when connection failed
});
 
client.on('connect', (connection) => {
    connection.on('error', (error)=> {
        // handle when some error occurs in existing connection
    });
    connection.on('close', () => {
        // connection closed
    });
    connection.on('message', function(message) {
        // i m condsidering we are using utf8 data to communicate
        if (message.type === 'utf8') {
            // here we parse request object
            const request: Request = JSON.parse(message.utf8Data);
            // here you can handle the request object 
        }else{
            // handle other data types
        }
    });

    // here you start communicating with the server

    // example 1. normal requst
    const authRequest: Request = {
        id: "auth_request_id",
        api: "API_AUTH",
        complete: true,
        data: {
            user: "testUser",
            pass: "testUserPass"
        }
    }
    connection.sendUTF(JSON.stringify(authRequest));

    // example 2. long data request
    const longRequestChunk1: Request = {
        id: "long_chunck_request_id",
        api: "API_LONG_CHUNCK",
        complete: false, // observer this flag. as this is the first part of the chunk so this needs to be added to the queue on server
        data: "..." // path one of long data
    }
    const longRequestChunk2: Request = {
        id: "long_chunck_request_id", // request id must be the same
        api: "API_LONG_CHUNCK", // same api
        complete: true, // as this is the last part of the chunk so this flag is true
        data: "..." // path one of long data
    }
    connection.sendUTF(JSON.stringify(longRequestChunk1));
    connection.sendUTF(JSON.stringify(longRequestChunk2));
    
});
 
client.connect('ws://localhost:8080/', 'echo-protocol');

如果你愿意,我可以进一步解释;)

我在开发自己的“纯 NodeJs WebSocket 服务器”时遇到了您的问题。对于小于 1-2 KiB 的有效载荷,所有这些都运行良好。当我尝试发送更多,但仍在 [64 KiB - 1] 限制(16 位有效载荷长度)内时,它随机炸毁服务器并出现 ERR_OUT_OF_RANGE 错误。

旁注: https://medium.com/hackernoon/implementing-a-websocket-server-with-node-js-d9b78ec5ffa8 Srushtika Neelakantam 的“使用 Node.js 实现 WebSocket 服务器”是一篇优秀的文章!在我发现它之前,WebSocket 对我来说一直是一个黑盒子。她从头开始描述了非常简单易懂的 WebSocket client/server 实现。不幸的是,它缺乏(故意不让文章变得困难)对更大的有效载荷和缓冲区对齐的支持。我只是想赞扬 Srushtika Neelakantam,因为如果没有她的文章,我永远不会编写自己的纯 NodeJs WebSocket 服务器。

文章中描述的解决方案失败只是因为 NodeJs 缓冲区已经结束,没有更多的字节可读,但函数的逻辑需要更多的字节。您以 ERR_OUT_OF_RANGE 错误结束。代码只是想读取尚不可用但将在下一个 'data' 事件中可用的字节。

这个问题的解决方法就是检查你想从缓冲区读取的下一个字节是否真的可用。只要有字节就可以了。当字节数减少或字节数增加时,挑战就开始了。为了更加灵活,解析缓冲区的函数应该 return 不仅是 payload,还应该成对:payload 和 bufferRemainingBytes。它将允许在主数据事件处理程序中连接缓冲区。

我们需要处理三种情况:

  1. 当缓冲区中的字节数正好适合构建有效的 WebSocket 框架时,我们 return { 有效载荷:payloadFromValidWebSocketFrame,bufferRemainingBytes:Buffer.alloc(0) }

  2. 当有足够的字节来构建有效的 WebSocket 但缓冲区中仍然剩下很少的字节时,我们 return { 有效载荷:payloadFromValidWebSocketFrame,bufferRemainingBytes:bufferBytesAfterValidWebSocketFrame }

    这种情况还迫使我们用 do-while 循环包装所有 getParsedBuffer 调用。 bufferRemainingBytes 仍可能包含第二个(或第三个或更多)有效的 WebSocket 帧。我们需要在当前处理的套接字数据事件中全部解析。

  3. 当没有足够的字节来构建有效的 WebSocket 框架时,我们 return 清空有效负载和整个缓冲区作为 bufferRemainingBytes { 有效负载:null,bufferRemainingBytes:缓冲区 }

如何在后续的套接字数据事件中将缓冲区与bufferRemainingBytes合并在一起?这是代码:

server.on('upgrade', (req, socket) => {
  let bufferToParse = Buffer.alloc(0); // at the beginning we just start with 0 bytes

  // .........

  socket.on('data', buffer => {
    let parsedBuffer;

    // concat 'past' bytes with the 'current' bytes
    bufferToParse = Buffer.concat([bufferToParse, buffer]);

    do {
      parsedBuffer = getParsedBuffer(bufferToParse);

      // the output of the debugBuffer calls will be on the screenshot later
      debugBuffer('buffer', buffer);
      debugBuffer('bufferToParse', bufferToParse);
      debugBuffer('parsedBuffer.payload', parsedBuffer.payload);
      debugBuffer('parsedBuffer.bufferRemainingBytes', parsedBuffer.bufferRemainingBytes);

      bufferToParse = parsedBuffer.bufferRemainingBytes;

      if (parsedBuffer.payload) {
        // .........
        // handle the payload as you like, for example send to other sockets
      }
    } while (parsedBuffer.payload && parsedBuffer.bufferRemainingBytes.length);

    console.log('----------------------------------------------------------------\n');
  });

  // .........
});

这是我的 getParsedBuffer 函数的样子(在文章中称为 parseMessage):

const getParsedBuffer = buffer => {
  // .........
  
  // whenever I want to read X bytes I simply check if I really can read X bytes
  if (currentOffset + 2 > buffer.length) {
    return { payload: null, bufferRemainingBytes: buffer };
  }
  payloadLength = buffer.readUInt16BE(currentOffset);
  currentOffset += 2;
  
  // .........
  
  // in 99% of cases this will prevent the ERR_OUT_OF_RANGE error to happen
  if (currentOffset + payloadLength > buffer.length) {
    console.log('[misalignment between WebSocket frame and NodeJs Buffer]\n');
    return { payload: null, bufferRemainingBytes: buffer };
  }

  payload = Buffer.alloc(payloadLength);

  if (isMasked) {
    // ......... I skip masked code as it's too long and not masked shows the idea same way
  } else {
    for (let i = 0; i < payloadLength; i++) {
      payload.writeUInt8(buffer.readUInt8(currentOffset++), i);
    }
  }

  // it could also happen at this point that we already have a valid WebSocket payload
  // but there are still some bytes remaining in the buffer
  // we need to copy all unused bytes and return them as bufferRemainingBytes
  bufferRemainingBytes = Buffer.alloc(buffer.length - currentOffset);
  //                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ this value could be >= 0
  for (let i = 0; i < bufferRemainingBytes.length; i++) {
    bufferRemainingBytes.writeUInt8(buffer.readUInt8(currentOffset++), i);
  }

  return { payload, bufferRemainingBytes };
}

所描述解决方案的实际测试(64 KiB - 1 字节):


简而言之 - 上述解决方案应该可以很好地处理高达 [64 KiB - 1] 字节的有效负载。它完全用纯 NodeJs 编写,没有任何外部库。我想这就是您在项目中寻找的东西 ;)

请在 GitHub 要点上找到完整版二进制广播应用程序的链接:

有一段时间(在我部署具有更多功能的更新应用程序之前)可以在此处找到上述要点的现场演示:

http://sndu.pl - 让我们把文件发给你