如何使用交换重用 RabbitMQ 队列?
How to reuse RabbitMQ queues using exchanges?
我会解释我想达到什么,然后我做了什么(没有结果)
我有两个节点服务通过 RabbitMQ 使用交换(主题)连接在它们之间:
我想要的是关闭 C1
,同时仍然向 something.orange.something
发送消息。然后我想重新启动我的 C1
并接收我丢失的所有消息。
现在发生在我身上的是,每次我重新启动时,我的消费者都会创建一个新队列,并在我的交换中使用相同的路由密钥创建一个新绑定。所以我现在有两个队列接收相同的信息。
如果我用参数 {exclusive: true}
配置我的队列,我解决了部分问题,我不再有没有接收者的队列,但仍然有同样的问题...所有消息在没有活动接收者的情况下发送迷路了。
可能吗?
这是我的代码:
发件人:
'use strict';
const amqp = require('amqplib/callback_api');
const logatim = require('logatim');
logatim.setLevel('info')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
let ex = 'direct_colors';
let args = process.argv.slice(2);
let colors = ['colors.en.green', 'colors.en.yellow', 'colors.es.red']
ch.assertExchange(ex, 'topic', {durable: true});
setInterval(() => {
let color = colors[Math.floor(Math.random() * 3)];
let msg = `This is a ${color} message`;
ch.publish(ex, color, new Buffer(msg));
logatim[color.split('.').pop()].info(msg);
}, 1000);
});
});
接收者:
'use strict';
const amqp = require('amqplib/callback_api');
const logatim = require('logatim');
logatim.setLevel('info');
const args = process.argv.slice(2);
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
var ex = 'direct_colors';
ch.assertExchange(ex, 'topic', {durable: true});
ch.assertQueue('', {exclusive: true, durable: true}, (err, q) => {
logatim.green.info(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach((arg) => {
ch.bindQueue(q.queue, ex, arg);
});
ch.consume(q.queue, (msg) => {
logatim[msg.fields.routingKey.split('.').pop()].info(` [x] ${msg.content.toString()}`);
});
});
});
});
您需要命名队列。当您在接收器 class 中声明队列时,给它一个 well-known 名称(常量),例如
ch.assertQueue('my_service_1_queue', {durable: true}, ...
它们的基本示例在 RabbitMQ Tutorial
当您的消费者关闭并且 re-start 时,它将从同一个命名队列中消费。
注意:您不需要那里的独占队列,因为当您的消费者出现故障时它将被删除。
我会解释我想达到什么,然后我做了什么(没有结果)
我有两个节点服务通过 RabbitMQ 使用交换(主题)连接在它们之间:
我想要的是关闭 C1
,同时仍然向 something.orange.something
发送消息。然后我想重新启动我的 C1
并接收我丢失的所有消息。
现在发生在我身上的是,每次我重新启动时,我的消费者都会创建一个新队列,并在我的交换中使用相同的路由密钥创建一个新绑定。所以我现在有两个队列接收相同的信息。
如果我用参数 {exclusive: true}
配置我的队列,我解决了部分问题,我不再有没有接收者的队列,但仍然有同样的问题...所有消息在没有活动接收者的情况下发送迷路了。
可能吗?
这是我的代码:
发件人:
'use strict';
const amqp = require('amqplib/callback_api');
const logatim = require('logatim');
logatim.setLevel('info')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
let ex = 'direct_colors';
let args = process.argv.slice(2);
let colors = ['colors.en.green', 'colors.en.yellow', 'colors.es.red']
ch.assertExchange(ex, 'topic', {durable: true});
setInterval(() => {
let color = colors[Math.floor(Math.random() * 3)];
let msg = `This is a ${color} message`;
ch.publish(ex, color, new Buffer(msg));
logatim[color.split('.').pop()].info(msg);
}, 1000);
});
});
接收者:
'use strict';
const amqp = require('amqplib/callback_api');
const logatim = require('logatim');
logatim.setLevel('info');
const args = process.argv.slice(2);
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
var ex = 'direct_colors';
ch.assertExchange(ex, 'topic', {durable: true});
ch.assertQueue('', {exclusive: true, durable: true}, (err, q) => {
logatim.green.info(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach((arg) => {
ch.bindQueue(q.queue, ex, arg);
});
ch.consume(q.queue, (msg) => {
logatim[msg.fields.routingKey.split('.').pop()].info(` [x] ${msg.content.toString()}`);
});
});
});
});
您需要命名队列。当您在接收器 class 中声明队列时,给它一个 well-known 名称(常量),例如
ch.assertQueue('my_service_1_queue', {durable: true}, ...
它们的基本示例在 RabbitMQ Tutorial
当您的消费者关闭并且 re-start 时,它将从同一个命名队列中消费。 注意:您不需要那里的独占队列,因为当您的消费者出现故障时它将被删除。