Amqp、rabbit mq 和 socket.io 重新连接到队列,即使客户端已关闭
Amqp, rabbit mq and socket.io reconnect to a queue even if client is closed
我正在使用 rabbitMQ 和 socket.io 编写浏览器通知代码。除了一种情况,我的配置工作正常。
当我用一个用户登录到我的系统时,它会创建一个通知-UID-用户ID队列(现在queueName由查询oaraeter发送,我将实现更复杂的方法我会尽快解决问题)
如果我在另一个浏览器上使用另一个用户登录,它会创建另一个队列通知-UID-seconduserid。
如果我注销其中一位用户,队列将消失(因为它不持久)。
问题是,当我在另一个会话上刷新或加载另一个页面时,它会重新创建第二个队列,即使未发送参数队列名称也是如此。
server.js
var amqp = require('amqp');
var app = require('express')();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var rabbitMqConnection = null;
var _queue = null;
var _consumerTag = null;
io.use(function (socket, next) {
var handshakeData = socket.handshake;
// Here i will implement token verification
console.log(socket.handshake.query.queueName);
next();
});
// Gets the connection event form client
io.sockets.on('connection', function (socket) {
var queueName = socket.handshake.query.queueName;
console.log("Socket Connected");
// Connects to rabbiMq
rabbitMqConnection = amqp.createConnection({host: 'localhost', reconnect: false});
// Update our stored tag when it changes
rabbitMqConnection.on('tag.change', function (event) {
if (_consumerTag === event.oldConsumerTag) {
_consumerTag = event.consumerTag;
// Consider unsubscribing from the old tag just in case it lingers
_queue.unsubscribe(event.oldConsumerTag);
}
});
// Listen for ready event
rabbitMqConnection.on('ready', function () {
console.log('Connected to rabbitMQ');
// Listen to the queue
rabbitMqConnection.queue(queueName, {
closeChannelOnUnsubscribe: true,
durable: false,
autoClose: true
},
function (queue) {
console.log('Connected to ' + queueName);
_queue = queue;
// Bind to the exchange
queue.bind('users.direct', queueName);
queue.subscribe({ack: false, prefetchCount: 1}, function (message, headers, deliveryInfo, ack) {
console.log("Received a message from route " + deliveryInfo.routingKey);
socket.emit('notification', message);
//ack.acknowledge();
}).addCallback(function (res) {
// Hold on to the consumer tag so we can unsubscribe later
_consumerTag = res.consumerTag;
});
});
});
// Listen for disconnection
socket.on('disconnect', function () {
_queue.unsubscribe(_consumerTag);
rabbitMqConnection.disconnect();
console.log("Socket Disconnected");
});
});
http.listen(8080);
client.js
var io = require('socket.io-client');
$(document).ready(function () {
var socket = io('http://myserver.it:8080/', {
query: { queueName: 'notification-UID-' + UID},
'sync disconnect on unload': true,
});
socket.on('notification', function (data) {
console.log(data);
});
})
有什么想法吗?
所以我解决了我的问题,这是一个把事情搞得一团糟的可变范围问题。
让我解释一下我在做什么,也许对某人有用。
基本上我正在尝试创建一个浏览器通知系统,这意味着我的应用程序向交换发布(生产者端)一个通知对象,其中包含一些信息,例如主题、link 和消息。
交换是一个 fanout (users.notification.fanout),有两个绑定交换:users.direct(直接型)和users.notification.store(扇出型)。
当生产者发布通知时,它会向 users.notification.fanout 发送路由键 "notification-UID-userid"(其中 userid 是真实用户 ID。
通知对象同时到达 users.direct 和 users.notification.store
最后一个有一个消费者在用户未登录的情况下将通知写入数据库,第一个将通知发布到浏览器。
那么浏览器消费者是如何工作的?
我使用了 socket.io、节点服务器和 amqplib 的经典组合。
每次用户登录时,socket.io创建一个队列,名称和路由密钥通知-UID-用户ID并将其绑定到users.direct交换.
与此同时,我已将 https 添加到我的服务器,因此与第一个版本相比有些变化。
您可以阅读评论以了解它的作用。
所以我的server.js是
var amqp = require('amqp');
var fs = require('fs');
var app = require('express')();
// Https server, certificates and private key added
var https = require('https').Server({
key: fs.readFileSync('/home/www/site/privkey.pem'),
cert: fs.readFileSync('/home/www/site/fullchain.pem')},app);
var io = require('socket.io')(https);
// Used to verify if token is valid
// If not it will discard connection
io.use(function (socket, next) {
var handshakeData = socket.handshake;
// Here i will implement token verification
console.log("Check this token: " + handshakeData.query.token);
next();
});
// Gets the connection event from client
io.sockets.on('connection', function (socket) {
// Connection log
console.log("Socket Connected with ID: " + socket.id);
// THIS WAS THE PROBLEM
// Local variables for connections
// Former i've put these variables outside the connection so at
// every client they were "overridden".
// RabbitMq Connection (Just for current client)
var _rabbitMqConnection = null;
// Queue (just for current client)
var _queue = null;
// Consumer tag (just for current client)
var _consumerTag = null;
// Queue name and routing key for current user
var queueName = socket.handshake.query.queueName;
// Connects to rabbiMq with default data to localhost guest guest
_rabbitMqConnection = amqp.createConnection();
// Connection ready
_rabbitMqConnection.on('ready', function () {
// Connection log
console.log('#' + socket.id + ' - Connected to RabbitMQ');
// Creates the queue (default is transient and autodelete)
// https://www.npmjs.com/package/amqp#connectionqueuename-options-opencallback
_rabbitMqConnection.queue(queueName, function (queue) {
// Connection log
console.log('#' + socket.id + ' - Connected to ' + queue.name + ' queue');
// Stores local queue
_queue = queue;
// Bind to the exchange (default)
queue.bind('users.direct', queueName, function () {
// Binding log
console.log('#' + socket.id + ' - Binded to users.direct exchange');
// Consumer definition
queue.subscribe({ack: false}, function (message, headers, deliveryInfo, messageObject) {
// Message log
console.log('#' + socket.id + ' - Received a message from route ' + deliveryInfo.routingKey);
// Emit the message to the client
socket.emit('notification', message);
}).addCallback(function (res) {
// Hold on to the consumer tag so we can unsubscribe later
_consumerTag = res.consumerTag;
// Consumer tag log
console.log('#' + socket.id + ' - Consumer ' + _consumerTag + ' created');
})
});
});
});
// Update our stored tag when it changes
_rabbitMqConnection.on('tag.change', function (event) {
if (_consumerTag === event.oldConsumerTag) {
_consumerTag = event.consumerTag;
// Unsubscribe from the old tag just in case it lingers
_queue.unsubscribe(event.oldConsumerTag);
}
});
// Listen for disconnection
socket.on('disconnect', function () {
_queue.unsubscribe(_consumerTag);
_rabbitMqConnection.disconnect();
console.log('#' + socket.id + ' - Socket Disconnected');
});
});
https.listen(8080);
然后我的client.js
var io = require('socket.io-client');
$(document).ready(function () {
var socket = io('https://myserver.com:8080/', {
secure: true, // for ssl connections
query: { queueName: 'notification-UID-' + UID, token: JWTToken}, // params sent to server, JWTToken for authentication
'sync disconnect on unload': true // Every time the client unload, socket disconnects
});
socket.on('notification', function (data) {
// Do what you want with your data
console.log(data);
});
})
我正在使用 rabbitMQ 和 socket.io 编写浏览器通知代码。除了一种情况,我的配置工作正常。
当我用一个用户登录到我的系统时,它会创建一个通知-UID-用户ID队列(现在queueName由查询oaraeter发送,我将实现更复杂的方法我会尽快解决问题)
如果我在另一个浏览器上使用另一个用户登录,它会创建另一个队列通知-UID-seconduserid。
如果我注销其中一位用户,队列将消失(因为它不持久)。
问题是,当我在另一个会话上刷新或加载另一个页面时,它会重新创建第二个队列,即使未发送参数队列名称也是如此。
server.js
var amqp = require('amqp');
var app = require('express')();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var rabbitMqConnection = null;
var _queue = null;
var _consumerTag = null;
io.use(function (socket, next) {
var handshakeData = socket.handshake;
// Here i will implement token verification
console.log(socket.handshake.query.queueName);
next();
});
// Gets the connection event form client
io.sockets.on('connection', function (socket) {
var queueName = socket.handshake.query.queueName;
console.log("Socket Connected");
// Connects to rabbiMq
rabbitMqConnection = amqp.createConnection({host: 'localhost', reconnect: false});
// Update our stored tag when it changes
rabbitMqConnection.on('tag.change', function (event) {
if (_consumerTag === event.oldConsumerTag) {
_consumerTag = event.consumerTag;
// Consider unsubscribing from the old tag just in case it lingers
_queue.unsubscribe(event.oldConsumerTag);
}
});
// Listen for ready event
rabbitMqConnection.on('ready', function () {
console.log('Connected to rabbitMQ');
// Listen to the queue
rabbitMqConnection.queue(queueName, {
closeChannelOnUnsubscribe: true,
durable: false,
autoClose: true
},
function (queue) {
console.log('Connected to ' + queueName);
_queue = queue;
// Bind to the exchange
queue.bind('users.direct', queueName);
queue.subscribe({ack: false, prefetchCount: 1}, function (message, headers, deliveryInfo, ack) {
console.log("Received a message from route " + deliveryInfo.routingKey);
socket.emit('notification', message);
//ack.acknowledge();
}).addCallback(function (res) {
// Hold on to the consumer tag so we can unsubscribe later
_consumerTag = res.consumerTag;
});
});
});
// Listen for disconnection
socket.on('disconnect', function () {
_queue.unsubscribe(_consumerTag);
rabbitMqConnection.disconnect();
console.log("Socket Disconnected");
});
});
http.listen(8080);
client.js
var io = require('socket.io-client');
$(document).ready(function () {
var socket = io('http://myserver.it:8080/', {
query: { queueName: 'notification-UID-' + UID},
'sync disconnect on unload': true,
});
socket.on('notification', function (data) {
console.log(data);
});
})
有什么想法吗?
所以我解决了我的问题,这是一个把事情搞得一团糟的可变范围问题。 让我解释一下我在做什么,也许对某人有用。
基本上我正在尝试创建一个浏览器通知系统,这意味着我的应用程序向交换发布(生产者端)一个通知对象,其中包含一些信息,例如主题、link 和消息。
交换是一个 fanout (users.notification.fanout),有两个绑定交换:users.direct(直接型)和users.notification.store(扇出型)。
当生产者发布通知时,它会向 users.notification.fanout 发送路由键 "notification-UID-userid"(其中 userid 是真实用户 ID。
通知对象同时到达 users.direct 和 users.notification.store 最后一个有一个消费者在用户未登录的情况下将通知写入数据库,第一个将通知发布到浏览器。
那么浏览器消费者是如何工作的?
我使用了 socket.io、节点服务器和 amqplib 的经典组合。
每次用户登录时,socket.io创建一个队列,名称和路由密钥通知-UID-用户ID并将其绑定到users.direct交换.
与此同时,我已将 https 添加到我的服务器,因此与第一个版本相比有些变化。
您可以阅读评论以了解它的作用。
所以我的server.js是
var amqp = require('amqp');
var fs = require('fs');
var app = require('express')();
// Https server, certificates and private key added
var https = require('https').Server({
key: fs.readFileSync('/home/www/site/privkey.pem'),
cert: fs.readFileSync('/home/www/site/fullchain.pem')},app);
var io = require('socket.io')(https);
// Used to verify if token is valid
// If not it will discard connection
io.use(function (socket, next) {
var handshakeData = socket.handshake;
// Here i will implement token verification
console.log("Check this token: " + handshakeData.query.token);
next();
});
// Gets the connection event from client
io.sockets.on('connection', function (socket) {
// Connection log
console.log("Socket Connected with ID: " + socket.id);
// THIS WAS THE PROBLEM
// Local variables for connections
// Former i've put these variables outside the connection so at
// every client they were "overridden".
// RabbitMq Connection (Just for current client)
var _rabbitMqConnection = null;
// Queue (just for current client)
var _queue = null;
// Consumer tag (just for current client)
var _consumerTag = null;
// Queue name and routing key for current user
var queueName = socket.handshake.query.queueName;
// Connects to rabbiMq with default data to localhost guest guest
_rabbitMqConnection = amqp.createConnection();
// Connection ready
_rabbitMqConnection.on('ready', function () {
// Connection log
console.log('#' + socket.id + ' - Connected to RabbitMQ');
// Creates the queue (default is transient and autodelete)
// https://www.npmjs.com/package/amqp#connectionqueuename-options-opencallback
_rabbitMqConnection.queue(queueName, function (queue) {
// Connection log
console.log('#' + socket.id + ' - Connected to ' + queue.name + ' queue');
// Stores local queue
_queue = queue;
// Bind to the exchange (default)
queue.bind('users.direct', queueName, function () {
// Binding log
console.log('#' + socket.id + ' - Binded to users.direct exchange');
// Consumer definition
queue.subscribe({ack: false}, function (message, headers, deliveryInfo, messageObject) {
// Message log
console.log('#' + socket.id + ' - Received a message from route ' + deliveryInfo.routingKey);
// Emit the message to the client
socket.emit('notification', message);
}).addCallback(function (res) {
// Hold on to the consumer tag so we can unsubscribe later
_consumerTag = res.consumerTag;
// Consumer tag log
console.log('#' + socket.id + ' - Consumer ' + _consumerTag + ' created');
})
});
});
});
// Update our stored tag when it changes
_rabbitMqConnection.on('tag.change', function (event) {
if (_consumerTag === event.oldConsumerTag) {
_consumerTag = event.consumerTag;
// Unsubscribe from the old tag just in case it lingers
_queue.unsubscribe(event.oldConsumerTag);
}
});
// Listen for disconnection
socket.on('disconnect', function () {
_queue.unsubscribe(_consumerTag);
_rabbitMqConnection.disconnect();
console.log('#' + socket.id + ' - Socket Disconnected');
});
});
https.listen(8080);
然后我的client.js
var io = require('socket.io-client');
$(document).ready(function () {
var socket = io('https://myserver.com:8080/', {
secure: true, // for ssl connections
query: { queueName: 'notification-UID-' + UID, token: JWTToken}, // params sent to server, JWTToken for authentication
'sync disconnect on unload': true // Every time the client unload, socket disconnects
});
socket.on('notification', function (data) {
// Do what you want with your data
console.log(data);
});
})