Node.js: 一次接收过多UDP报文,丢失

Node.js: Receiving too many UDP messages at a time, losing them

我的节点服务器在一秒钟内接收到大约 400 条 UDP 消息,并且全部 有效 ,我能够处理所有 400 条其中

但是,当我开始在一秒钟内收到大约 700 条 UDP 消息 时,我丢失了 2-20 条消息,并且他们永远不会被解析:(

我在这里考虑了一些选项

  1. 创建所有套接字消息的队列,然后一个一个消费, 虽然我不确定如何实现这个
    • 不知道如何实施
  2. Node / Express / dgram 套接字中找到一个 setting,我可以在其中增加内存大小/缓冲区大小,诸如此类
    • 虽然我找不到任何这样的设置:(
  3. 使用不同的 UDP 接收器,停止使用节点的内置套接字 UDP 接收器
    • 没有找到其他收件人

这是我的 UDP 发件人的样子:

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");

udpserver.on("message",
        function (msg, rinfo)
        {
        seatStateStore.parseMessage(msg.toString());
    });

有人有什么想法吗?我想不出 3 个选项中的任何一个:/有人可以帮我吗?

节点 v0.10.29

快递 v3.14.0

===============================

更新/解决方案

这是我最终使用的代码(稍微修改了@RoyHB 的解决方案):

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
var Dequeue = require('dequeue');
var FIFO = new Dequeue();

fetcher();

udpserver.on("message",
        function (msg, rinfo)
        {
           FIFO.push(msg.toString());
        });

udpserver.bind(43278);

function fetcher () {
    while (FIFO.length > 0) 
    {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
    }
    setImmediate(fetcher); //make this function continuously run
}

有一个名为 node-dequeue 的 NPM 模块。我经常在与你类似的情况下使用它。

基本上,

  1. 您的程序将收到的消息推送到队列的末尾。
  2. 一个间隔计时器周期性地激活另一个方法或函数(一个队列获取器),它检查队列中是否有消息,如果有,获取一个或多个并处理它。
  3. 或者(可能更好)不使用计时器来安排队列提取。而是使用节点 process.nextTick 方法。

或者,您可以使用节点 process.nextTick 来持续检查消息队列。

理想情况下,seatStateStore.parseMessage 会创建一个新对象来异步处理一条消息,以便 parseMessage returns 不会延迟,而实际的消息处理在后台继续进行。 (见示例代码底部)

我没有测试下面的代码,它是为了说明,而不是运行

var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");

setInterval(fetcher, 1);

var udpserver = dgram.createSocket("udp4");

udpserver.on("message",
    function (msg, rinfo) {
        FIFO.push(msg);
    }
);

function fetcher () {
    while (FIFO.length > 0) {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
    }
}

** 或(可能更好)**

var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");

fetcher();

var udpserver = dgram.createSocket("udp4");

udpserver.on("message",
    function (msg, rinfo) {
        FIFO.push(msg);
    }
);

function fetcher () {
    while (FIFO.length > 0) {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
        process.nextTick(fetcher);
    }
}

seatStateProcessor.parseMessage的大纲:

seatStateProcessor.parseMessage = function (msg) {
    proc = new asyncProcHandler(msg, function (err) {
        if (err) {
            //handle the error
        }
    });
}

我知道这个问题已经有了答案,但截至今天,我从官方文档中找到了增加 dgram 缓冲区的方法:official doc.

socket.setRecvBufferSize(size);
Added in: v8.7.0
size <integer>
Sets the SO_RCVBUF socket option. Sets the maximum socket receive buffer in bytes.

socket.setSendBufferSize(size)
Added in: v8.7.0
size <integer>
Sets the SO_SNDBUF socket option. Sets the maximum socket send buffer in bytes.

用法示例:

var socket = dgram.createSocket('udp4');
socket.on("listening", () => {
    socket.setRecvBufferSize(100000000); // 100mb
    socket.setSendBufferSize(100000000); // 100mb
});

默认值为65507