Node.js rsmq - 在 Node.js 应用程序重新启动之前,新消息不会变得可见
Node.js rsmq - New message doesn't become visible until Node.js application is restarted
我正在努力让 this 包工作。
redis版本:稳定4.0.6
我是这样连接Redis的,没有问题。
pubsub.js
var redis = require("redis");
var psRedis = redis.createClient();
psRedis.auth("mypasswordishere", function (callback) {
console.log("connected");
});
module.exports.psRedis = psRedis;
启动Node.js应用程序后,我可以在控制台上看到"connected"并执行操作,我已经检查过了。
我的 test.js
文件在下面。
test.js
var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var RedisSMQ = require("rsmq");
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis});
rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
if (resp === 1) {
console.log("queue created");
}
});
rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
if (resp) {
console.log(resp);
}
});
router.get('/pubsubTest', function (req, res, next) {
async1.waterfall([
function (callback) {
rsmq.sendMessage({qname: "myqueue", message: "Hello World 1"}, function (err, resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}
});
callback(null, 'done!');
}
], function (err, result) {
res.sendStatus(200);
});
});
module.exports = router;
但是,当我访问 /pubsubTest
时,控制台上只显示消息 ID。
Message sent. ID: exb289xu0i7IaQPEy1wA4O7xQQ6n0CAp
如果我重新启动我的 Node.js 应用程序,我会看到下面的结果,这是预期的。为什么它不立即出现?
{ id: 'exb289xu0i7IaQPEy1wA4O7xQQ6n0CAp',
message: 'Hello World 1',
rc: 1,
fr: 1515802884138,
sent: 1515802880098 }
谢谢。
receiveMessage
不会"fire"。您需要在 发送消息后 调用它。
您正在寻找的是 rsmq.
提供的 realtime 选项
var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});
现在,对于通过 sendMessage
添加到队列的每条新消息,都会向 rsmq:rt:{qname}
发送内容为 {msg}
的 PUBLISH 消息。在您的情况下,sendMessage
将发出一个事件,即 rsmq:rt:myqueue
对此有两种解决方案,都将使用事件 rsmq:rt:myqueue
1.First一个会用到redis客户端,可以通过redis提供的subscribe
方法订阅这个发布的事件,实现PUB/SUB.
var redis = require('redis');
const subscribe = redis.createClient();
subscribe.subscribe('rsmq:rt:myqueue');
subscribe.on('message', function(msg) { //msg=>'rsmq:rt:myqueue'
rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
if (resp) {
console.log(resp);
}
});
});
整个代码看起来像这样:
var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});
rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
if (resp === 1) {
console.log("queue created");
}
});
const subscribe = redis.createClient( 6379,"127.0.0.1"); //creating new
worker and pass your credentials
subscribe.subscribe('rsmq:rt:myqueue');
subscribe.on('message', function(msg) { //msg=>'rsmq:rt:myqueue'
rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
if (resp) {
console.log(resp);
}
});
});
router.get('/pubsubTest', function (req, res, next) {
async1.waterfall([
function (callback) {
rsmq.sendMessage({qname: "myqueue", message: "Hello World 1"},
function (err,
resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}});
callback(null, 'done!');
}
], function (err, result) {
res.sendStatus(200);
});
});
module.exports = router;
2.Second 解决方案是使用 rsmq-worker
,它将为您提供一个 message 事件,您可以使用 .on
方法收听。
var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1}); // this worker
will poll the queue every .1 second.
worker.on( "message", function( message, next, msgid ){
if(message){
console.log(message);
}
next();
});
worker.start();
整个代码看起来像这样:
var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis},{ ns: "rsmq",realtime :true});
rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
if (resp === 1) {
console.log("queue created");
}
});
var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1});
worker.on( "message", function( message, next, msgid ){
if(message){
console.log(message);
}
next();
});
// optional error listeners
worker.on('error', function( err, msg ){
console.log( "ERROR", err, msg.id );
});
worker.on('exceeded', function( msg ){
console.log( "EXCEEDED", msg.id );
});
worker.on('timeout', function( msg ){
console.log( "TIMEOUT", msg.id, msg.rc );
});
worker.start();
router.get('/pubsubTest', function (req, res, next) {
async1.waterfall([
function (callback) {
rsmq.sendMessage({qname: "myqueue", message: "Hello World1"}
,function (err, resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}});
callback(null, 'done!');
}
], function (err, result) {
res.sendStatus(200);
});
});
module.exports = router;
注意:在第一个解决方案中,您需要使用 deleteMessage
删除从队列中收到的消息,或者您也可以使用 popMessage
来接收最后一条消息并为您删除.如果您不删除该消息,您将再次收到所有消息,直到该特定消息的超时结束。
出于这个原因,我更喜欢使用第二种解决方案,rsmq 会为您处理这些事情,您也可以提供自己的轮询间隔
我正在努力让 this 包工作。
redis版本:稳定4.0.6
我是这样连接Redis的,没有问题。
pubsub.js
var redis = require("redis");
var psRedis = redis.createClient();
psRedis.auth("mypasswordishere", function (callback) {
console.log("connected");
});
module.exports.psRedis = psRedis;
启动Node.js应用程序后,我可以在控制台上看到"connected"并执行操作,我已经检查过了。
我的 test.js
文件在下面。
test.js
var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var RedisSMQ = require("rsmq");
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis});
rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
if (resp === 1) {
console.log("queue created");
}
});
rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
if (resp) {
console.log(resp);
}
});
router.get('/pubsubTest', function (req, res, next) {
async1.waterfall([
function (callback) {
rsmq.sendMessage({qname: "myqueue", message: "Hello World 1"}, function (err, resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}
});
callback(null, 'done!');
}
], function (err, result) {
res.sendStatus(200);
});
});
module.exports = router;
但是,当我访问 /pubsubTest
时,控制台上只显示消息 ID。
Message sent. ID: exb289xu0i7IaQPEy1wA4O7xQQ6n0CAp
如果我重新启动我的 Node.js 应用程序,我会看到下面的结果,这是预期的。为什么它不立即出现?
{ id: 'exb289xu0i7IaQPEy1wA4O7xQQ6n0CAp',
message: 'Hello World 1',
rc: 1,
fr: 1515802884138,
sent: 1515802880098 }
谢谢。
receiveMessage
不会"fire"。您需要在 发送消息后 调用它。
您正在寻找的是 rsmq.
var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});
现在,对于通过 sendMessage
添加到队列的每条新消息,都会向 rsmq:rt:{qname}
发送内容为 {msg}
的 PUBLISH 消息。在您的情况下,sendMessage
将发出一个事件,即 rsmq:rt:myqueue
对此有两种解决方案,都将使用事件 rsmq:rt:myqueue
1.First一个会用到redis客户端,可以通过redis提供的subscribe
方法订阅这个发布的事件,实现PUB/SUB.
var redis = require('redis');
const subscribe = redis.createClient();
subscribe.subscribe('rsmq:rt:myqueue');
subscribe.on('message', function(msg) { //msg=>'rsmq:rt:myqueue'
rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
if (resp) {
console.log(resp);
}
});
});
整个代码看起来像这样:
var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});
rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
if (resp === 1) {
console.log("queue created");
}
});
const subscribe = redis.createClient( 6379,"127.0.0.1"); //creating new
worker and pass your credentials
subscribe.subscribe('rsmq:rt:myqueue');
subscribe.on('message', function(msg) { //msg=>'rsmq:rt:myqueue'
rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
if (resp) {
console.log(resp);
}
});
});
router.get('/pubsubTest', function (req, res, next) {
async1.waterfall([
function (callback) {
rsmq.sendMessage({qname: "myqueue", message: "Hello World 1"},
function (err,
resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}});
callback(null, 'done!');
}
], function (err, result) {
res.sendStatus(200);
});
});
module.exports = router;
2.Second 解决方案是使用 rsmq-worker
,它将为您提供一个 message 事件,您可以使用 .on
方法收听。
var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1}); // this worker
will poll the queue every .1 second.
worker.on( "message", function( message, next, msgid ){
if(message){
console.log(message);
}
next();
});
worker.start();
整个代码看起来像这样:
var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis},{ ns: "rsmq",realtime :true});
rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
if (resp === 1) {
console.log("queue created");
}
});
var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1});
worker.on( "message", function( message, next, msgid ){
if(message){
console.log(message);
}
next();
});
// optional error listeners
worker.on('error', function( err, msg ){
console.log( "ERROR", err, msg.id );
});
worker.on('exceeded', function( msg ){
console.log( "EXCEEDED", msg.id );
});
worker.on('timeout', function( msg ){
console.log( "TIMEOUT", msg.id, msg.rc );
});
worker.start();
router.get('/pubsubTest', function (req, res, next) {
async1.waterfall([
function (callback) {
rsmq.sendMessage({qname: "myqueue", message: "Hello World1"}
,function (err, resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}});
callback(null, 'done!');
}
], function (err, result) {
res.sendStatus(200);
});
});
module.exports = router;
注意:在第一个解决方案中,您需要使用 deleteMessage
删除从队列中收到的消息,或者您也可以使用 popMessage
来接收最后一条消息并为您删除.如果您不删除该消息,您将再次收到所有消息,直到该特定消息的超时结束。
出于这个原因,我更喜欢使用第二种解决方案,rsmq 会为您处理这些事情,您也可以提供自己的轮询间隔