Node js TCP 服务器,socket.on('data') - 数据缓冲区包含高负载时的垃圾数据
Node js TCP server, socket.on('data') - data buffer contains garbage data on high load
我使用node js的net server,使用socket.on('data')函数接收数据。为了解析 TCP 消息,我使用解析缓冲区方法。这使用前 4 个字节作为 TCP 消息的长度,以便我可以从 TCP 流中读取并形成单独的 commands.In 摘要在高负载时发生的是,有一些垃圾数据作为 TCP 流的一部分返回这会导致问题。
function onConnect(client) {
var accumulatingBuffer = new Buffer(0);
var totalPacketLen = -1;
var accumulatingLen = 0;
var recvedThisTimeLen = 0;
client.on('data', function (data) {
parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen);
});
}
这里是parsebuffer方法。
function parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen) {
recvedThisTimeLen = Buffer.byteLength(data);
var tmpBuffer = new Buffer(accumulatingLen + recvedThisTimeLen);
accumulatingBuffer.copy(tmpBuffer);
data.copy(tmpBuffer, accumulatingLen); // offset for accumulating
accumulatingBuffer = tmpBuffer;
tmpBuffer = null;
accumulatingLen = accumulatingLen + recvedThisTimeLen;
if (accumulatingLen < PACKETHEADERLEN) {
return;
} else if (accumulatingLen === PACKETHEADERLEN) {
packetHeaderLen
return;
} else {
//a packet info is available..
if (totalPacketLen < 0) {
totalPacketLen = accumulatingBuffer.readUInt32BE(0);
}
}
while (accumulatingLen >= totalPacketLen + PACKETHEADERLEN) {
var aPacketBufExceptHeader = new Buffer(totalPacketLen); // a whole packet is available...
accumulatingBuffer.copy(aPacketBufExceptHeader, 0, PACKETHEADERLEN, PACKETHEADERLEN + totalPacketLen);
////////////////////////////////////////////////////////////////////
//process packet data
var stringData = aPacketBufExceptHeader.toString();
try {
var JSONObject = JSON.parse(stringData);
handler(client, JSONObject);
var newBufRebuild = new Buffer(accumulatingBuffer.length - (totalPacketLen + PACKETHEADERLEN)); // we can reduce size of allocatin
accumulatingBuffer.copy(newBufRebuild, 0, totalPacketLen + PACKETHEADERLEN, accumulatingBuffer.length);
//init
accumulatingLen = accumulatingLen - (totalPacketLen + PACKETHEADERLEN); //totalPacketLen+4
accumulatingBuffer = newBufRebuild;
newBufRebuild = null;
totalPacketLen = -1;
//For a case in which multiple packets are transmitted at once.
if (accumulatingLen <= PACKETHEADERLEN) {
//need to get more data -> wait..
return;
} else {
totalPacketLen = accumulatingBuffer.readUInt32BE(0);
}
} catch (ex) {
console.log(ex + ' unable to process data');
return;
}
}
}
一切都很好,直到使用大量客户端快速发送消息的高模拟负载。此时 ParseBuffer 方法中的第一行 "data.length"returns 超过了 TCP 数据的长度。这会导致代码将垃圾读取为 UInt32BE,这会导致 totalpacketlength 的值非常高(它告诉下一个数据包长度)。这会导致消息丢失。我错过了什么吗?请帮忙。
当您在 parseBuffer()
函数中执行此操作时:
accumulatingBuffer = tmpBuffer;
这只是将 tmpBuffer
分配给名为 accumulatingBuffer
的函数参数。它不会更改 onConnect()
方法中的 accumulatingBuffer
变量。因此,当您获得部分缓冲区时,您会丢失累积的部分。您传递给 parseBuffer()
的其他参数也存在同样的问题。在 parseBuffer()
中分配给它们不会更改 onConnect()
.
中的同名变量
可能有更简单的方法来编写它,但保持相同结构的最简单方法是不传递单个变量,而是传递具有这些变量作为对象属性的单个对象。然后,当您分配给属性时,您可以从 onConnect()
.
中获取这些新值
一般结构如下所示:
function onConnect(client) {
var args = {};
args.accumulatingBuffer = new Buffer(0);
args.totalPacketLen = -1;
args.accumulatingLen = 0;
args.recvedThisTimeLen = 0;
client.on('data', function (data) {
parseBuffer(client, data, args);
});
}
然后在 parseBuffer()
中进行相应的更改,以访问 args
对象上的参数作为属性。由于对象是通过指针传递的,因此当您从 parseBuffer 中分配给 args 对象的属性时,这些属性将在 onConnect
方法的 args
对象中可见。
仅供参考,我没有遵循函数其他地方的整个逻辑,因此也可能存在其他错误。这段代码看起来相当复杂,因为它尝试执行的相当常见的任务有很多缓冲区副本。这也是一种可能已经写过很多次的代码,甚至可能存在于一些预构建的库中。
我使用node js的net server,使用socket.on('data')函数接收数据。为了解析 TCP 消息,我使用解析缓冲区方法。这使用前 4 个字节作为 TCP 消息的长度,以便我可以从 TCP 流中读取并形成单独的 commands.In 摘要在高负载时发生的是,有一些垃圾数据作为 TCP 流的一部分返回这会导致问题。
function onConnect(client) {
var accumulatingBuffer = new Buffer(0);
var totalPacketLen = -1;
var accumulatingLen = 0;
var recvedThisTimeLen = 0;
client.on('data', function (data) {
parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen);
});
}
这里是parsebuffer方法。
function parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen) {
recvedThisTimeLen = Buffer.byteLength(data);
var tmpBuffer = new Buffer(accumulatingLen + recvedThisTimeLen);
accumulatingBuffer.copy(tmpBuffer);
data.copy(tmpBuffer, accumulatingLen); // offset for accumulating
accumulatingBuffer = tmpBuffer;
tmpBuffer = null;
accumulatingLen = accumulatingLen + recvedThisTimeLen;
if (accumulatingLen < PACKETHEADERLEN) {
return;
} else if (accumulatingLen === PACKETHEADERLEN) {
packetHeaderLen
return;
} else {
//a packet info is available..
if (totalPacketLen < 0) {
totalPacketLen = accumulatingBuffer.readUInt32BE(0);
}
}
while (accumulatingLen >= totalPacketLen + PACKETHEADERLEN) {
var aPacketBufExceptHeader = new Buffer(totalPacketLen); // a whole packet is available...
accumulatingBuffer.copy(aPacketBufExceptHeader, 0, PACKETHEADERLEN, PACKETHEADERLEN + totalPacketLen);
////////////////////////////////////////////////////////////////////
//process packet data
var stringData = aPacketBufExceptHeader.toString();
try {
var JSONObject = JSON.parse(stringData);
handler(client, JSONObject);
var newBufRebuild = new Buffer(accumulatingBuffer.length - (totalPacketLen + PACKETHEADERLEN)); // we can reduce size of allocatin
accumulatingBuffer.copy(newBufRebuild, 0, totalPacketLen + PACKETHEADERLEN, accumulatingBuffer.length);
//init
accumulatingLen = accumulatingLen - (totalPacketLen + PACKETHEADERLEN); //totalPacketLen+4
accumulatingBuffer = newBufRebuild;
newBufRebuild = null;
totalPacketLen = -1;
//For a case in which multiple packets are transmitted at once.
if (accumulatingLen <= PACKETHEADERLEN) {
//need to get more data -> wait..
return;
} else {
totalPacketLen = accumulatingBuffer.readUInt32BE(0);
}
} catch (ex) {
console.log(ex + ' unable to process data');
return;
}
}
}
一切都很好,直到使用大量客户端快速发送消息的高模拟负载。此时 ParseBuffer 方法中的第一行 "data.length"returns 超过了 TCP 数据的长度。这会导致代码将垃圾读取为 UInt32BE,这会导致 totalpacketlength 的值非常高(它告诉下一个数据包长度)。这会导致消息丢失。我错过了什么吗?请帮忙。
当您在 parseBuffer()
函数中执行此操作时:
accumulatingBuffer = tmpBuffer;
这只是将 tmpBuffer
分配给名为 accumulatingBuffer
的函数参数。它不会更改 onConnect()
方法中的 accumulatingBuffer
变量。因此,当您获得部分缓冲区时,您会丢失累积的部分。您传递给 parseBuffer()
的其他参数也存在同样的问题。在 parseBuffer()
中分配给它们不会更改 onConnect()
.
可能有更简单的方法来编写它,但保持相同结构的最简单方法是不传递单个变量,而是传递具有这些变量作为对象属性的单个对象。然后,当您分配给属性时,您可以从 onConnect()
.
一般结构如下所示:
function onConnect(client) {
var args = {};
args.accumulatingBuffer = new Buffer(0);
args.totalPacketLen = -1;
args.accumulatingLen = 0;
args.recvedThisTimeLen = 0;
client.on('data', function (data) {
parseBuffer(client, data, args);
});
}
然后在 parseBuffer()
中进行相应的更改,以访问 args
对象上的参数作为属性。由于对象是通过指针传递的,因此当您从 parseBuffer 中分配给 args 对象的属性时,这些属性将在 onConnect
方法的 args
对象中可见。
仅供参考,我没有遵循函数其他地方的整个逻辑,因此也可能存在其他错误。这段代码看起来相当复杂,因为它尝试执行的相当常见的任务有很多缓冲区副本。这也是一种可能已经写过很多次的代码,甚至可能存在于一些预构建的库中。