Headers 在 Node.js 中使用 RabbitMQ 交换示例
Headers exchange example using RabbitMQ in Node.js
我一直在到处寻找 headers exchange
在 Node.js 中使用 RabbitMQ 的示例。如果有人能指出我正确的方向,那就太好了。这是我目前所拥有的:
发布者方法(创建发布者)
RabbitMQ.prototype.publisher = function(exchange, type) {
console.log('New publisher, exchange: '+exchange+', type: '+type);
amqp.then(function(conn) {
conn.createConfirmChannel().then(function(ch) {
publishers[exchange] = {};
publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
publishers[exchange].ch = ch;
});
},function(err){
console.error("[AMQP]", err.message);
return setTimeout(function(){
self.connect(URI);
}, 1000);
}).then(null, console.log);
};
发布方法
RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
try {
publishers[exchange].assert.then(function(){
publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
publishers[exchange].ch.connection.close();
}
});
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
};
消费者方法(创建消费者)
RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
amqp.then(function(conn) {
conn.createChannel().then(function(ch) {
var ok = ch.assertExchange(exchange, type, {durable: true});
ok.then(function() {
ch.assertQueue('', {exclusive: true});
});
ok = ok.then(function(qok) {
var queue = qok.queue;
ch.bindQueue(queue,exchange,routingKey)
});
ok = ok.then(function(queue) {
ch.consume(queue, function(msg){
cb(msg,ch);
}, {noAck: false});
});
ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
});
}).then(null, console.warn);
};
以上示例适用于 topics
,但我不确定如何过渡到 headers
。我很确定我需要改变我的绑定方法,但还没有找到任何关于如何准确地完成这个的例子。
如有任何帮助,我们将不胜感激!
我在寻找与 amqplib. Unfortunately, like you I found all available documentation lacking 相同的答案时偶然发现了这个问题。在查看了源代码、阅读了一些协议并尝试了一些组合之后,这终于为我做到了。
...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...
...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...
完整的工作代码如下。下面的授权信息是伪造的,所以你必须使用你自己的。我还使用 ES6、nodejs 6.5 版和 amqplib。使用保留字作为 header 名称给 headers x-
前缀 and/or 可能存在问题,但我不太确定(我必须看看RabbitMQ 源代码)。
emit.js:
#!/usr/bin/env node
const XCHANGE = 'headers-exchange';
const Q = require('q');
const Broker = require('amqplib');
let scope = 'anonymous';
process.on('uncaughtException', (exception) => {
console.error(`"::ERROR:: Uncaught exception ${exception}`);
});
process.argv.slice(2).forEach((arg) =>
{
scope = arg;
console.info('[*] Scope now set to ' + scope);
});
Q.spawn(function*()
{
let conn = yield Broker.connect('amqp://root:root@localhost');
let chan = yield conn.createChannel();
chan.assertExchange(XCHANGE, 'headers', { durable: false });
for(let count=0;; count=++count%3)
{
let output = (new Date()).toString();
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
console.log(`[x] Published item "${output}" to <${XCHANGE} : ${JSON.stringify(opts)}>`);
yield Q.delay(500);
}
});
receive.js:
#!/usr/bin/env node
const Q = require('q');
const Broker = require('amqplib');
const uuid = require('node-uuid');
const Rx = require('rx');
Rx.Node = require('rx-node');
const XCHANGE = 'headers-exchange';
const WORKER_ID = uuid.v4();
const WORKER_SHORT_ID = WORKER_ID.substr(0, 4);
Q.spawn(function*() {
let conn = yield Broker.connect('amqp://root:root@localhost');
let chan = yield conn.createChannel();
chan.assertExchange(XCHANGE, 'headers', { durable: false });
let q = yield chan.assertQueue('', { exclusive: true });
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
console.info('[*] Binding with ' + JSON.stringify(opts));
console.log(`[*] Subscriber ${WORKER_ID} (${WORKER_SHORT_ID}) is online!`);
chan.consume(q.queue, (msg) =>
{
console.info(`[x](${WORKER_SHORT_ID}) Received pub "${msg.content.toString()}"`);
chan.ack(msg);
});
});
我一直在到处寻找 headers exchange
在 Node.js 中使用 RabbitMQ 的示例。如果有人能指出我正确的方向,那就太好了。这是我目前所拥有的:
发布者方法(创建发布者)
RabbitMQ.prototype.publisher = function(exchange, type) {
console.log('New publisher, exchange: '+exchange+', type: '+type);
amqp.then(function(conn) {
conn.createConfirmChannel().then(function(ch) {
publishers[exchange] = {};
publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
publishers[exchange].ch = ch;
});
},function(err){
console.error("[AMQP]", err.message);
return setTimeout(function(){
self.connect(URI);
}, 1000);
}).then(null, console.log);
};
发布方法
RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
try {
publishers[exchange].assert.then(function(){
publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
publishers[exchange].ch.connection.close();
}
});
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
};
消费者方法(创建消费者)
RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
amqp.then(function(conn) {
conn.createChannel().then(function(ch) {
var ok = ch.assertExchange(exchange, type, {durable: true});
ok.then(function() {
ch.assertQueue('', {exclusive: true});
});
ok = ok.then(function(qok) {
var queue = qok.queue;
ch.bindQueue(queue,exchange,routingKey)
});
ok = ok.then(function(queue) {
ch.consume(queue, function(msg){
cb(msg,ch);
}, {noAck: false});
});
ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
});
}).then(null, console.warn);
};
以上示例适用于 topics
,但我不确定如何过渡到 headers
。我很确定我需要改变我的绑定方法,但还没有找到任何关于如何准确地完成这个的例子。
如有任何帮助,我们将不胜感激!
我在寻找与 amqplib. Unfortunately, like you I found all available documentation lacking 相同的答案时偶然发现了这个问题。在查看了源代码、阅读了一些协议并尝试了一些组合之后,这终于为我做到了。
...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...
...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...
完整的工作代码如下。下面的授权信息是伪造的,所以你必须使用你自己的。我还使用 ES6、nodejs 6.5 版和 amqplib。使用保留字作为 header 名称给 headers x-
前缀 and/or 可能存在问题,但我不太确定(我必须看看RabbitMQ 源代码)。
emit.js:
#!/usr/bin/env node
const XCHANGE = 'headers-exchange';
const Q = require('q');
const Broker = require('amqplib');
let scope = 'anonymous';
process.on('uncaughtException', (exception) => {
console.error(`"::ERROR:: Uncaught exception ${exception}`);
});
process.argv.slice(2).forEach((arg) =>
{
scope = arg;
console.info('[*] Scope now set to ' + scope);
});
Q.spawn(function*()
{
let conn = yield Broker.connect('amqp://root:root@localhost');
let chan = yield conn.createChannel();
chan.assertExchange(XCHANGE, 'headers', { durable: false });
for(let count=0;; count=++count%3)
{
let output = (new Date()).toString();
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
console.log(`[x] Published item "${output}" to <${XCHANGE} : ${JSON.stringify(opts)}>`);
yield Q.delay(500);
}
});
receive.js:
#!/usr/bin/env node
const Q = require('q');
const Broker = require('amqplib');
const uuid = require('node-uuid');
const Rx = require('rx');
Rx.Node = require('rx-node');
const XCHANGE = 'headers-exchange';
const WORKER_ID = uuid.v4();
const WORKER_SHORT_ID = WORKER_ID.substr(0, 4);
Q.spawn(function*() {
let conn = yield Broker.connect('amqp://root:root@localhost');
let chan = yield conn.createChannel();
chan.assertExchange(XCHANGE, 'headers', { durable: false });
let q = yield chan.assertQueue('', { exclusive: true });
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
console.info('[*] Binding with ' + JSON.stringify(opts));
console.log(`[*] Subscriber ${WORKER_ID} (${WORKER_SHORT_ID}) is online!`);
chan.consume(q.queue, (msg) =>
{
console.info(`[x](${WORKER_SHORT_ID}) Received pub "${msg.content.toString()}"`);
chan.ack(msg);
});
});