Node.js: 一次接收过多UDP报文,丢失
Node.js: Receiving too many UDP messages at a time, losing them
我的节点服务器在一秒钟内接收到大约 400 条 UDP 消息,并且全部 有效 ,我能够处理所有 400 条其中
但是,当我开始在一秒钟内收到大约 700 条 UDP 消息 时,我丢失了 2-20 条消息,并且他们永远不会被解析:(
我在这里考虑了一些选项:
- 创建所有套接字消息的队列,然后一个一个消费,
虽然我不确定如何实现这个
- 不知道如何实施
- 在 Node / Express / dgram 套接字中找到一个 setting,我可以在其中增加内存大小/缓冲区大小,诸如此类
- 虽然我找不到任何这样的设置:(
- 使用不同的 UDP 接收器,停止使用节点的内置套接字 UDP 接收器
- 没有找到其他收件人
这是我的 UDP 发件人的样子:
var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
udpserver.on("message",
function (msg, rinfo)
{
seatStateStore.parseMessage(msg.toString());
});
有人有什么想法吗?我想不出 3 个选项中的任何一个:/有人可以帮我吗?
节点 v0.10.29
快递 v3.14.0
===============================
更新/解决方案
这是我最终使用的代码(稍微修改了@RoyHB 的解决方案):
var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
var Dequeue = require('dequeue');
var FIFO = new Dequeue();
fetcher();
udpserver.on("message",
function (msg, rinfo)
{
FIFO.push(msg.toString());
});
udpserver.bind(43278);
function fetcher () {
while (FIFO.length > 0)
{
var msg = FIFO.shift();
seatStateStore.parseMessage(msg);
}
setImmediate(fetcher); //make this function continuously run
}
有一个名为 node-dequeue 的 NPM 模块。我经常在与你类似的情况下使用它。
基本上,
- 您的程序将收到的消息推送到队列的末尾。
- 一个间隔计时器周期性地激活另一个方法或函数(一个队列获取器),它检查队列中是否有消息,如果有,获取一个或多个并处理它。
- 或者(可能更好)不使用计时器来安排队列提取。而是使用节点 process.nextTick 方法。
或者,您可以使用节点 process.nextTick 来持续检查消息队列。
理想情况下,seatStateStore.parseMessage 会创建一个新对象来异步处理一条消息,以便 parseMessage returns 不会延迟,而实际的消息处理在后台继续进行。 (见示例代码底部)
我没有测试下面的代码,它是为了说明,而不是运行
var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");
setInterval(fetcher, 1);
var udpserver = dgram.createSocket("udp4");
udpserver.on("message",
function (msg, rinfo) {
FIFO.push(msg);
}
);
function fetcher () {
while (FIFO.length > 0) {
var msg = FIFO.shift();
seatStateStore.parseMessage(msg);
}
}
** 或(可能更好)**
var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");
fetcher();
var udpserver = dgram.createSocket("udp4");
udpserver.on("message",
function (msg, rinfo) {
FIFO.push(msg);
}
);
function fetcher () {
while (FIFO.length > 0) {
var msg = FIFO.shift();
seatStateStore.parseMessage(msg);
process.nextTick(fetcher);
}
}
seatStateProcessor.parseMessage的大纲:
seatStateProcessor.parseMessage = function (msg) {
proc = new asyncProcHandler(msg, function (err) {
if (err) {
//handle the error
}
});
}
我知道这个问题已经有了答案,但截至今天,我从官方文档中找到了增加 dgram 缓冲区的方法:official doc.
socket.setRecvBufferSize(size);
Added in: v8.7.0
size <integer>
Sets the SO_RCVBUF socket option. Sets the maximum socket receive buffer in bytes.
socket.setSendBufferSize(size)
Added in: v8.7.0
size <integer>
Sets the SO_SNDBUF socket option. Sets the maximum socket send buffer in bytes.
用法示例:
var socket = dgram.createSocket('udp4');
socket.on("listening", () => {
socket.setRecvBufferSize(100000000); // 100mb
socket.setSendBufferSize(100000000); // 100mb
});
默认值为65507
我的节点服务器在一秒钟内接收到大约 400 条 UDP 消息,并且全部 有效 ,我能够处理所有 400 条其中
但是,当我开始在一秒钟内收到大约 700 条 UDP 消息 时,我丢失了 2-20 条消息,并且他们永远不会被解析:(
我在这里考虑了一些选项:
- 创建所有套接字消息的队列,然后一个一个消费,
虽然我不确定如何实现这个
- 不知道如何实施
- 在 Node / Express / dgram 套接字中找到一个 setting,我可以在其中增加内存大小/缓冲区大小,诸如此类
- 虽然我找不到任何这样的设置:(
- 使用不同的 UDP 接收器,停止使用节点的内置套接字 UDP 接收器
- 没有找到其他收件人
这是我的 UDP 发件人的样子:
var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
udpserver.on("message",
function (msg, rinfo)
{
seatStateStore.parseMessage(msg.toString());
});
有人有什么想法吗?我想不出 3 个选项中的任何一个:/有人可以帮我吗?
节点 v0.10.29
快递 v3.14.0
===============================
更新/解决方案
这是我最终使用的代码(稍微修改了@RoyHB 的解决方案):
var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
var Dequeue = require('dequeue');
var FIFO = new Dequeue();
fetcher();
udpserver.on("message",
function (msg, rinfo)
{
FIFO.push(msg.toString());
});
udpserver.bind(43278);
function fetcher () {
while (FIFO.length > 0)
{
var msg = FIFO.shift();
seatStateStore.parseMessage(msg);
}
setImmediate(fetcher); //make this function continuously run
}
有一个名为 node-dequeue 的 NPM 模块。我经常在与你类似的情况下使用它。
基本上,
- 您的程序将收到的消息推送到队列的末尾。
- 一个间隔计时器周期性地激活另一个方法或函数(一个队列获取器),它检查队列中是否有消息,如果有,获取一个或多个并处理它。
- 或者(可能更好)不使用计时器来安排队列提取。而是使用节点 process.nextTick 方法。
或者,您可以使用节点 process.nextTick 来持续检查消息队列。
理想情况下,seatStateStore.parseMessage 会创建一个新对象来异步处理一条消息,以便 parseMessage returns 不会延迟,而实际的消息处理在后台继续进行。 (见示例代码底部)
我没有测试下面的代码,它是为了说明,而不是运行
var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");
setInterval(fetcher, 1);
var udpserver = dgram.createSocket("udp4");
udpserver.on("message",
function (msg, rinfo) {
FIFO.push(msg);
}
);
function fetcher () {
while (FIFO.length > 0) {
var msg = FIFO.shift();
seatStateStore.parseMessage(msg);
}
}
** 或(可能更好)**
var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");
fetcher();
var udpserver = dgram.createSocket("udp4");
udpserver.on("message",
function (msg, rinfo) {
FIFO.push(msg);
}
);
function fetcher () {
while (FIFO.length > 0) {
var msg = FIFO.shift();
seatStateStore.parseMessage(msg);
process.nextTick(fetcher);
}
}
seatStateProcessor.parseMessage的大纲:
seatStateProcessor.parseMessage = function (msg) {
proc = new asyncProcHandler(msg, function (err) {
if (err) {
//handle the error
}
});
}
我知道这个问题已经有了答案,但截至今天,我从官方文档中找到了增加 dgram 缓冲区的方法:official doc.
socket.setRecvBufferSize(size);
Added in: v8.7.0
size <integer>
Sets the SO_RCVBUF socket option. Sets the maximum socket receive buffer in bytes.
socket.setSendBufferSize(size)
Added in: v8.7.0
size <integer>
Sets the SO_SNDBUF socket option. Sets the maximum socket send buffer in bytes.
用法示例:
var socket = dgram.createSocket('udp4');
socket.on("listening", () => {
socket.setRecvBufferSize(100000000); // 100mb
socket.setSendBufferSize(100000000); // 100mb
});
默认值为65507