节点 RabbitMQ 消费消息并为每条消息做一些事情
Node RabbitMQ consume message and do something for each message
我想使用来自 rabbitmq 服务的消息,对于我收到的每条消息,我想做一些事情(例如:将该消息放入数据库,处理消息并通过 RabbitMq 通过另一个队列发送回复)信息。
目前我的RabbitMq消费者代码如下:
const all = require('bluebird').all;
const basename = require('path').basename;
function receive() {
const severities = process.argv.slice(2);
if (severities.length < 1) {
console.warn('Usage: %s [info] [warning] [error]',
basename(process.argv[1]));
process.exit(1);
}
let config = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'rumesh',
password: 'password',
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',
};
amqp.connect(config).then(function (conn) {
process.once('SIGINT', function () {
conn.close();
});
return conn.createChannel().then(function (ch) {
let queue = 'test';
let exchange = 'test-exchange';
let key = 'python-key';
let exchange_type = 'direct';
let ok = ch.assertExchange(exchange, exchange_type, {durable: true});
ok = ok.then(function () {
return ch.assertQueue(queue, { durable: true});
});
ok = ok.then(function (qok) {
const queue = qok.queue;
return all(severities.map(function (sev) {
ch.bindQueue(queue, exchange, sev,{durable: true});
})).then(function () {
return queue;
});
});
ok = ok.then(function (queue) {
return ch.consume(queue, logMessage, {noAck: true});
});
return ok.then(function () {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
}
});
}).catch(console.warn);
}
module.exports = receive;```
我建议您创建一个像 onNewMessage 这样的处理程序函数,每次您在队列中收到新消息时都会调用它。
您可以通过多种方式对消息进行编码,因为您可以通过 AMQP 发送二进制数据。
JSON肯定是一种发送消息的方式,这个在Node.js.
中处理起来很方便
下面是一些连接到服务器然后发送和接收消息的示例代码:
const amqp = require('amqplib');
const queue = 'test';
// Set your config here...
let config = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'rumesh',
password: 'password',
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',
};
async function start() {
try {
const conn = await createConnection(config);
console.log("Connected to AMQP server.");
let channel = await conn.createChannel();
await channel.assertQueue(queue, { durable: true});
startPollingForMessages(channel);
startSendingMessages(channel);
} catch (err) {
console.error("start: Connection error:",err.message);
}
}
async function createConnection(config) {
const conn = await amqp.connect(config);
conn.on("error", function(err) {
console.error("Connection error:",err.message);
});
conn.on("close", function() {
console.error("Connection closed:", err.message);
});
return conn;
}
function startSendingMessages(channel) {
const SEND_INTERVAL = 5000;
setInterval(() => {
sendMessage(channel, queue, JSON.stringify({ timestamp: new Date().toISOString(), message: " Some message" }));
}, SEND_INTERVAL);
}
async function sendMessage(channel, queue, messageContent) {
console.log(`sendMessage: sending message: ${messageContent}...`);
return channel.sendToQueue(queue, Buffer.from(messageContent))
}
function startPollingForMessages(ch) {
ch.consume(queue, (msg) => {
onNewMessage(msg);
ch.ack(msg);
});
}
function onNewMessage(msg) {
// Do your database stuff or whatever here....
console.log("On new message:", msg.content.toString())
}
start();
我想使用来自 rabbitmq 服务的消息,对于我收到的每条消息,我想做一些事情(例如:将该消息放入数据库,处理消息并通过 RabbitMq 通过另一个队列发送回复)信息。
目前我的RabbitMq消费者代码如下:
const all = require('bluebird').all;
const basename = require('path').basename;
function receive() {
const severities = process.argv.slice(2);
if (severities.length < 1) {
console.warn('Usage: %s [info] [warning] [error]',
basename(process.argv[1]));
process.exit(1);
}
let config = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'rumesh',
password: 'password',
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',
};
amqp.connect(config).then(function (conn) {
process.once('SIGINT', function () {
conn.close();
});
return conn.createChannel().then(function (ch) {
let queue = 'test';
let exchange = 'test-exchange';
let key = 'python-key';
let exchange_type = 'direct';
let ok = ch.assertExchange(exchange, exchange_type, {durable: true});
ok = ok.then(function () {
return ch.assertQueue(queue, { durable: true});
});
ok = ok.then(function (qok) {
const queue = qok.queue;
return all(severities.map(function (sev) {
ch.bindQueue(queue, exchange, sev,{durable: true});
})).then(function () {
return queue;
});
});
ok = ok.then(function (queue) {
return ch.consume(queue, logMessage, {noAck: true});
});
return ok.then(function () {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
}
});
}).catch(console.warn);
}
module.exports = receive;```
我建议您创建一个像 onNewMessage 这样的处理程序函数,每次您在队列中收到新消息时都会调用它。
您可以通过多种方式对消息进行编码,因为您可以通过 AMQP 发送二进制数据。
JSON肯定是一种发送消息的方式,这个在Node.js.
中处理起来很方便下面是一些连接到服务器然后发送和接收消息的示例代码:
const amqp = require('amqplib');
const queue = 'test';
// Set your config here...
let config = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'rumesh',
password: 'password',
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',
};
async function start() {
try {
const conn = await createConnection(config);
console.log("Connected to AMQP server.");
let channel = await conn.createChannel();
await channel.assertQueue(queue, { durable: true});
startPollingForMessages(channel);
startSendingMessages(channel);
} catch (err) {
console.error("start: Connection error:",err.message);
}
}
async function createConnection(config) {
const conn = await amqp.connect(config);
conn.on("error", function(err) {
console.error("Connection error:",err.message);
});
conn.on("close", function() {
console.error("Connection closed:", err.message);
});
return conn;
}
function startSendingMessages(channel) {
const SEND_INTERVAL = 5000;
setInterval(() => {
sendMessage(channel, queue, JSON.stringify({ timestamp: new Date().toISOString(), message: " Some message" }));
}, SEND_INTERVAL);
}
async function sendMessage(channel, queue, messageContent) {
console.log(`sendMessage: sending message: ${messageContent}...`);
return channel.sendToQueue(queue, Buffer.from(messageContent))
}
function startPollingForMessages(ch) {
ch.consume(queue, (msg) => {
onNewMessage(msg);
ch.ack(msg);
});
}
function onNewMessage(msg) {
// Do your database stuff or whatever here....
console.log("On new message:", msg.content.toString())
}
start();