DNS 循环故障转移不适用于 mqtt.js

DNS Round Robin failover doesn't work with mqtt.js

我有一个使用 mqtt.js 连接到 emqx 集群的 NodeJS 应用程序。

MQTT 集群包含 2 个节点,我尝试为使用 DNS 轮询提供故障转移。所以我有 1 个 A-Record(假设 mqtt.example.com)指向 2 个 IP(IP1 和 IP2)。当两个节点都在线时,我的 NodeJS 应用程序也可以正常连接并订阅所选主题。

现在,在 MQTT 节点上,我可以看到应用程序连接到哪个节点。当我现在停止应用程序连接到的节点时,我希望它(迟早)会故障转移到第二个活动节点。

我还测试了 Loraserver(连接到 MQTT)以及 MQTT 的 Node Red 实现,当我停止它们所连接的节点时,它们都立即连接到活动节点。

但是我的 mqtt.js NodeJS 应用程序一直在尝试连接到我刚刚停止的节点,而不是尝试连接到活动节点。

场景说明:

  1. 我有 2 个活动节点,IP1 和 IP2
  2. 我将 Loraserver、Node Red 和 NodeJS 连接到 mqtt.example.com
  3. 所有 3 个应用程序都连接到 IP1
  4. 我通过关闭emqx进程来停止IP1节点
  5. Loraserver 和 Node Red 会立即自动连接到 IP2
  6. 带有 mqtt.js 的 NodeJS 但是一直向我显示错误消息

Error: connect ECONNREFUSED

使用 IP1 并且不会故障转移到 IP2(保持它 运行 大约 20 分钟,没有任何反应。DNS 租用时间设置为 5 分钟,如果有任何相关性的话)

如何为使用 mqtt.js 的应用程序使用 DNS Round Robing 实现故障转移? 感谢您的帮助

编辑:根据要求,添加了测试代码:

const mqtt = require('mqtt');
const tls = require('tls');
const MQTTTOPIC = 'test/upload';
const BROKER_URL = 'tls://mqtt.example.com';
const BROKER_PORT = '8883';
const MQTTUSER = 'username';
const MQTTPASS = 'password';

var mqttoptions = {
    clientId: MQTTUSER,
    port: BROKER_PORT,
    keepalive: 60,
    username: MQTTUSER,
    password: MQTTPASS
};

var client = mqtt.connect(BROKER_URL, mqttoptions);
client.on('connect', mqtt_connect);
client.on('reconnect', mqtt_reconnect);
client.on('error', mqtt_error);
client.on('message', mqtt_messsageReceived);
client.on('close', mqtt_close);

function mqtt_connect() {
    console.log("Connecting MQTT");
    client.subscribe(MQTTTOPIC, mqtt_subscribe);
}

function mqtt_subscribe(err, granted) {
    console.log("Subscribed to " + MQTTTOPIC);
    if (err) { console.error(err); }
}

function mqtt_reconnect(err) {
    console.log("Reconnect MQTT");
    if (err) { console.error(err); }
    client = mqtt.connect(BROKER_URL, mqttoptions);
}

function mqtt_error(err) {
    console.error("MQTT Error!");
    if (err) { console.error(err); }
}

function after_publish() {
    //do nothing
}

function mqtt_close() {
    console.warn("Close MQTT");
}

function mqtt_messsageReceived(topic, message, packet) {
    console.log("Message: " + message + " --- Received on Topic " + topic);
}

编辑 2: 以防万一,我是 运行 带有 pm2

的代码

编辑 3: 加上完整的日志输出:

17|LOCALTE | Connecting MQTT
17|LOCALTE | Subscribed to test/upload
17|LOCALTE | Close MQTT
17|LOCALTE | Reconnect MQTT
17|LOCALTE | Error: connect ECONNREFUSED IP1:8883
17|LOCALTE |     at Object.exports._errnoException (util.js:1034:11)
17|LOCALTE |     at exports._exceptionWithHostPort (util.js:1057:20)
17|LOCALTE |     at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1096:14)
17|LOCALTE | MQTT Error!
17|LOCALTE | { Error: connect ECONNREFUSED IP1:8883
17|LOCALTE |     at Object.exports._errnoException (util.js:1034:11)
17|LOCALTE |     at exports._exceptionWithHostPort (util.js:1057:20)
17|LOCALTE |     at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1096:14)
17|LOCALTE |   code: 'ECONNREFUSED',
17|LOCALTE |   errno: 'ECONNREFUSED',
17|LOCALTE |   syscall: 'connect',
17|LOCALTE |   address: 'IP1',
17|LOCALTE |   port: 8883 }
17|LOCALTE | Close MQTT
17|LOCALTE | Reconnect MQTT
[...]

首先,您不应该在 on.('reconnect') 回调中调用 connect()。图书馆将为您处理所有这一切。

...
function mqtt_reconnect(err) {
    console.log("Reconnect MQTT");
    if (err) { console.error(err); }
}
...

如果删除它后它仍然不起作用,作为解决方法,您可以列出一组服务器,图书馆将自行循环使用这些服务器。

var mqttoptions = {
    clientId: MQTTUSER,
    port: BROKER_PORT,
    keepalive: 60,
    username: MQTTUSER,
    password: MQTTPASS
    servers:[
        {
            protocol: 'mqtts',
            host: 'ip-address-1',
            port: 8883
        },
        {
            protocol: 'mqtts',
            host: 'ip-address-2',
            port: 8883
        }
    ]
}