amqp.node 不会检测到连接中断
amqp.node won't detect a connection drop
我们有一个 node.js 脚本运行 socket.io 服务器,其客户端使用来自 RabbitMQ 队列的消息。我们最近迁移到了 Amazon AWS,而 RabbitMQ 现在是两台机器(冗余实例)的集群。 AMQP 连接有时会丢失(这是来自具有冗余 VM 的高可用性环境的限制,我们必须应对它)并且如果尝试重新连接,DNS 会选择连接到哪个实例(它是一个具有数据复制功能的集群,因此连接到哪个实例并不重要。
问题是从未尝试重新连接;过了一会儿,当连接丢失时,amqp.node 显然没有注意到连接已经丢失。此外,消费者停止接收消息,socket.io 服务器只是停止接受新连接。
我们在 RabbitMQ URL 上设置了 55 秒的心跳超时(不要与 socket.io 心跳超时混淆)并且正在检查 'error' 和 'close' 事件与 amqp.node 的回调 API 但它们显然从未发布过。队列期望消费的消息被确认。我们希望节点脚本检测到丢失的连接并自行完成,因此环境将自动启动一个新进程并再次建立连接。
这是代码,也许我们在 amqp.node 回调 API 或其他方面做错了什么。
var express = require('express');
app = express();
var http = require('http');
var serverio = http.createServer(app);
var io = require('socket.io').listen(serverio, { log: false });
var socket;
var allcli = [];
var red, blue, green, magenta, reset;
red = '3[31m';
blue = '3[34m';
green = '3[32m';
magenta = '3[35m';
orange = '3[43m';
reset = '3[0m';
var queue = 'ha.atualizacao_mobile';
var urlRabbit = 'amqp://login:password@host?heartbeat=55' // Amazon
var amqp = require('amqplib/callback_api');
var debug = true;
console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds.");
io.set('heartbeat interval', 10 * 60);
console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients.");
console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds.");
io.set('heartbeat timeout', 11 * 60);
console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds.");
io.sockets.on('connection', function(socket){
socket.on('error', function (exc) {
console.log(orange+"Ignoring exception: " + exc + reset);
});
socket.on('send-indice', function (data) {
// Some business logic
});
socket.on('disconnect', function () {
// Some business logic
});
});
function updatecli(data){
// Some business logic
}
amqp.connect(urlRabbit, null, function(err, conn) {
if (err !== null) {
return console.log("Error creating connection: " + err);
}
conn.on('error', function(err) {
console.log("Generated event 'error': " + err);
});
conn.on('close', function() {
console.log("Connection closed.");
process.exit();
});
processRabbitConnection(conn, function() {
conn.close();
});
});
function processRabbitConnection(conn, finalize) {
conn.createChannel(function(err, channel) {
if (err != null) {
console.log("Error creating channel: " + err);
return finalize();
}
channel.assertQueue(queue, null, function(err, ok) {
if (err !== null) {
console.log("Error asserting queue " + queue + ": " + err);
return finalize();
}
channel.consume(queue, function (msg) {
if (msg !== null) {
try {
var dataObj = JSON.parse(msg.content);
if (debug == true) {
//console.log(dataObj);
}
updatecli(dataObj);
} catch(err) {
console.log("Error in JSON: " + err);
}
channel.ack(msg);
}
}, null, function(err, ok) {
if (err !== null) {
console.log("Error consuming message: " + err);
return finalize();
}
});
});
});
}
serverio.listen(9128, function () {
console.log('Server: Socket IO Online - Port: 9128 - ' + new Date());
});
看来问题已经解决了。近 60 秒的心跳是问题所在。它与 RabbitMQ 负载均衡器冲突,RabbitMQ 负载均衡器每 1 分钟左右检查一次数据是否通过连接(如果没有数据通过,它会断开连接)。 AMQP 连接停止接收消息,图书馆显然对此没有反应。为了避免这种情况,需要较低的心跳(例如 30 秒)。
我们有一个 node.js 脚本运行 socket.io 服务器,其客户端使用来自 RabbitMQ 队列的消息。我们最近迁移到了 Amazon AWS,而 RabbitMQ 现在是两台机器(冗余实例)的集群。 AMQP 连接有时会丢失(这是来自具有冗余 VM 的高可用性环境的限制,我们必须应对它)并且如果尝试重新连接,DNS 会选择连接到哪个实例(它是一个具有数据复制功能的集群,因此连接到哪个实例并不重要。
问题是从未尝试重新连接;过了一会儿,当连接丢失时,amqp.node 显然没有注意到连接已经丢失。此外,消费者停止接收消息,socket.io 服务器只是停止接受新连接。
我们在 RabbitMQ URL 上设置了 55 秒的心跳超时(不要与 socket.io 心跳超时混淆)并且正在检查 'error' 和 'close' 事件与 amqp.node 的回调 API 但它们显然从未发布过。队列期望消费的消息被确认。我们希望节点脚本检测到丢失的连接并自行完成,因此环境将自动启动一个新进程并再次建立连接。
这是代码,也许我们在 amqp.node 回调 API 或其他方面做错了什么。
var express = require('express');
app = express();
var http = require('http');
var serverio = http.createServer(app);
var io = require('socket.io').listen(serverio, { log: false });
var socket;
var allcli = [];
var red, blue, green, magenta, reset;
red = '3[31m';
blue = '3[34m';
green = '3[32m';
magenta = '3[35m';
orange = '3[43m';
reset = '3[0m';
var queue = 'ha.atualizacao_mobile';
var urlRabbit = 'amqp://login:password@host?heartbeat=55' // Amazon
var amqp = require('amqplib/callback_api');
var debug = true;
console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds.");
io.set('heartbeat interval', 10 * 60);
console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients.");
console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds.");
io.set('heartbeat timeout', 11 * 60);
console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds.");
io.sockets.on('connection', function(socket){
socket.on('error', function (exc) {
console.log(orange+"Ignoring exception: " + exc + reset);
});
socket.on('send-indice', function (data) {
// Some business logic
});
socket.on('disconnect', function () {
// Some business logic
});
});
function updatecli(data){
// Some business logic
}
amqp.connect(urlRabbit, null, function(err, conn) {
if (err !== null) {
return console.log("Error creating connection: " + err);
}
conn.on('error', function(err) {
console.log("Generated event 'error': " + err);
});
conn.on('close', function() {
console.log("Connection closed.");
process.exit();
});
processRabbitConnection(conn, function() {
conn.close();
});
});
function processRabbitConnection(conn, finalize) {
conn.createChannel(function(err, channel) {
if (err != null) {
console.log("Error creating channel: " + err);
return finalize();
}
channel.assertQueue(queue, null, function(err, ok) {
if (err !== null) {
console.log("Error asserting queue " + queue + ": " + err);
return finalize();
}
channel.consume(queue, function (msg) {
if (msg !== null) {
try {
var dataObj = JSON.parse(msg.content);
if (debug == true) {
//console.log(dataObj);
}
updatecli(dataObj);
} catch(err) {
console.log("Error in JSON: " + err);
}
channel.ack(msg);
}
}, null, function(err, ok) {
if (err !== null) {
console.log("Error consuming message: " + err);
return finalize();
}
});
});
});
}
serverio.listen(9128, function () {
console.log('Server: Socket IO Online - Port: 9128 - ' + new Date());
});
看来问题已经解决了。近 60 秒的心跳是问题所在。它与 RabbitMQ 负载均衡器冲突,RabbitMQ 负载均衡器每 1 分钟左右检查一次数据是否通过连接(如果没有数据通过,它会断开连接)。 AMQP 连接停止接收消息,图书馆显然对此没有反应。为了避免这种情况,需要较低的心跳(例如 30 秒)。