如何使用 Nodejs 在 RabbitMQ 中实现远程过程调用 (RPC)
how to implement Remote procedure call (RPC) in RabbitMQ using Nodejs
所以我想获取一个 Json 并将其解析为对象,然后实现一个 RPC RabbitMQ 服务器,以便我可以通过 RabbitMQ 将对象发送到服务器,然后对象将继续保存在一个本地数组中,一个通用唯一的 ID 将告诉对象存储的确切位置,将 return 通过 RPC 从该服务器发送到客户端。
官方网站展示了 RabbitMQ 中 RPC 的一些实现,在这里你可以找到他们的实现 https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html,在教程中他们发送一个数字,服务器将计算斐波那契数列和 return 结果给客户。相反,我想发送一个对象,而不是一个数字,我想接收我将存储在我的程序中的全局数组中的那个对象的通用唯一 id(uuid),我更改了代码以便它发送一个对象和return uuid 但它不起作用。我会感谢你们的帮助
//this is my server_rpc.js code :
const amqp = require('amqplib/callback_api');
const uuid = require("uuid/v1");
amqp.connect('here is the url: example: localhost', (err, conn) => {
conn.createChannel( (err, ch) => {
let q = 'rpc_queue';
ch.assertQueue(q, {durable: false});
ch.prefetch(10);
console.log(' [x] Waiting RPC requests');
ch.consume(q, function reply(msg) {
console.log("corralation key is: ", msg.properties.correlationId);
let n = uuid();
console.log(" data received ",JSON.parse(JSON.stringify(msg.content.toString())));
console.log("corralation key is: ", msg.properties.correlationId);
ch.sendToQueue(msg.properties.replyTo, Buffer.from(n.toString()), {correlationId: msg.properties.correlationId});
ch.ack(msg);
});
});
});
// and this is my client_rpc.js code :
const amqp = require('amqplib/callback_api');
const uuid = require("uuid/v1");
const express = require("express");
let data = {
"name" : "hil01",
"region" : "weissach",
"ID" : "1",
"version" : "0.0.1"
}
amqp.connect('url: example localhost ', (err, conn) => {
conn.createChannel( (err, ch) => {
ch.assertQueue('', {exclusive: true}, (err, q) => {
var corr = generateUuid();
var newHil = JSON.stringify(data);
console.log(" [x] Requesting uuid for the registered HIL: ", newHil );
console.log("corralation key is: ", corr);
ch.consume(q.queue, function(msg) {
if(msg.properties.correlationId == corr) {
console.log(" [.] Got %s", msg.content.toString());
setTimeout(() => { conn.close(); process.exit(0) }, 100);
}
}, {noAck: true});
ch.sendToQueue('rpc_queue', Buffer.from(newHil, {correlationId: corr, replyTo: q.queue }));
});
});
});
//method to generate the uuid, later will be replaced with the real
uuid function
var generateUuid = () => Math.random().toString() +
Math.random().toString() + Math.random().toString() ;
当我 运行 server_rpc, [x] waiting for requests 应该被打印然后在一个单独的 cmd i 运行 client_rpc.js 然后对象应该被发送并且服务器执行并 return 将 uuid 返回给客户端。
看来您在这里需要的是直接回复 RPC 模式:您想发送消息并获得响应。
这里是 RabbitMQ 上关于直接回复的文档:https://www.rabbitmq.com/direct-reply-to.html
TL;TR
这里是一个客户端和服务器的例子,可以开箱即用:
https://github.com/Igor-lkm/node-rabbitmq-rpc-direct-reply-to
里面是什么
安装 RabbitMQ 后,您将需要执行 2 个文件:
server.js
const amqp = require('amqplib');
const uuidv4 = require('uuid/v4');
const RABBITMQ = 'amqp://guest:guest@localhost:5672';
const open = require('amqplib').connect(RABBITMQ);
const q = 'example';
// Consumer
open
.then(function(conn) {
console.log(`[ ${new Date()} ] Server started`);
return conn.createChannel();
})
.then(function(ch) {
return ch.assertQueue(q).then(function(ok) {
return ch.consume(q, function(msg) {
console.log(
`[ ${new Date()} ] Message received: ${JSON.stringify(
JSON.parse(msg.content.toString('utf8')),
)}`,
);
if (msg !== null) {
const response = {
uuid: uuidv4(),
};
console.log(
`[ ${new Date()} ] Message sent: ${JSON.stringify(response)}`,
);
ch.sendToQueue(
msg.properties.replyTo,
Buffer.from(JSON.stringify(response)),
{
correlationId: msg.properties.correlationId,
},
);
ch.ack(msg);
}
});
});
})
.catch(console.warn);
client.js
const amqp = require('amqplib');
const EventEmitter = require('events');
const uuid = require('uuid');
const RABBITMQ = 'amqp://guest:guest@localhost:5672';
// pseudo-queue for direct reply-to
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
const q = 'example';
// Credits for Event Emitter goes to https://github.com/squaremo/amqp.node/issues/259
const createClient = rabbitmqconn =>
amqp
.connect(rabbitmqconn)
.then(conn => conn.createChannel())
.then(channel => {
channel.responseEmitter = new EventEmitter();
channel.responseEmitter.setMaxListeners(0);
channel.consume(
REPLY_QUEUE,
msg => {
channel.responseEmitter.emit(
msg.properties.correlationId,
msg.content.toString('utf8'),
);
},
{ noAck: true },
);
return channel;
});
const sendRPCMessage = (channel, message, rpcQueue) =>
new Promise(resolve => {
const correlationId = uuid.v4();
channel.responseEmitter.once(correlationId, resolve);
channel.sendToQueue(rpcQueue, Buffer.from(message), {
correlationId,
replyTo: REPLY_QUEUE,
});
});
const init = async () => {
const channel = await createClient(RABBITMQ);
const message = { uuid: uuid.v4() };
console.log(`[ ${new Date()} ] Message sent: ${JSON.stringify(message)}`);
const respone = await sendRPCMessage(channel, JSON.stringify(message), q);
console.log(`[ ${new Date()} ] Message received: ${respone}`);
process.exit();
};
try {
init();
} catch (e) {
console.log(e);
}
你会得到一个结果,比如:
所以我想获取一个 Json 并将其解析为对象,然后实现一个 RPC RabbitMQ 服务器,以便我可以通过 RabbitMQ 将对象发送到服务器,然后对象将继续保存在一个本地数组中,一个通用唯一的 ID 将告诉对象存储的确切位置,将 return 通过 RPC 从该服务器发送到客户端。
官方网站展示了 RabbitMQ 中 RPC 的一些实现,在这里你可以找到他们的实现 https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html,在教程中他们发送一个数字,服务器将计算斐波那契数列和 return 结果给客户。相反,我想发送一个对象,而不是一个数字,我想接收我将存储在我的程序中的全局数组中的那个对象的通用唯一 id(uuid),我更改了代码以便它发送一个对象和return uuid 但它不起作用。我会感谢你们的帮助
//this is my server_rpc.js code :
const amqp = require('amqplib/callback_api');
const uuid = require("uuid/v1");
amqp.connect('here is the url: example: localhost', (err, conn) => {
conn.createChannel( (err, ch) => {
let q = 'rpc_queue';
ch.assertQueue(q, {durable: false});
ch.prefetch(10);
console.log(' [x] Waiting RPC requests');
ch.consume(q, function reply(msg) {
console.log("corralation key is: ", msg.properties.correlationId);
let n = uuid();
console.log(" data received ",JSON.parse(JSON.stringify(msg.content.toString())));
console.log("corralation key is: ", msg.properties.correlationId);
ch.sendToQueue(msg.properties.replyTo, Buffer.from(n.toString()), {correlationId: msg.properties.correlationId});
ch.ack(msg);
});
});
});
// and this is my client_rpc.js code :
const amqp = require('amqplib/callback_api');
const uuid = require("uuid/v1");
const express = require("express");
let data = {
"name" : "hil01",
"region" : "weissach",
"ID" : "1",
"version" : "0.0.1"
}
amqp.connect('url: example localhost ', (err, conn) => {
conn.createChannel( (err, ch) => {
ch.assertQueue('', {exclusive: true}, (err, q) => {
var corr = generateUuid();
var newHil = JSON.stringify(data);
console.log(" [x] Requesting uuid for the registered HIL: ", newHil );
console.log("corralation key is: ", corr);
ch.consume(q.queue, function(msg) {
if(msg.properties.correlationId == corr) {
console.log(" [.] Got %s", msg.content.toString());
setTimeout(() => { conn.close(); process.exit(0) }, 100);
}
}, {noAck: true});
ch.sendToQueue('rpc_queue', Buffer.from(newHil, {correlationId: corr, replyTo: q.queue }));
});
});
});
//method to generate the uuid, later will be replaced with the real
uuid function
var generateUuid = () => Math.random().toString() +
Math.random().toString() + Math.random().toString() ;
当我 运行 server_rpc, [x] waiting for requests 应该被打印然后在一个单独的 cmd i 运行 client_rpc.js 然后对象应该被发送并且服务器执行并 return 将 uuid 返回给客户端。
看来您在这里需要的是直接回复 RPC 模式:您想发送消息并获得响应。
这里是 RabbitMQ 上关于直接回复的文档:https://www.rabbitmq.com/direct-reply-to.html
TL;TR
这里是一个客户端和服务器的例子,可以开箱即用:
https://github.com/Igor-lkm/node-rabbitmq-rpc-direct-reply-to
里面是什么
安装 RabbitMQ 后,您将需要执行 2 个文件:
server.js
const amqp = require('amqplib');
const uuidv4 = require('uuid/v4');
const RABBITMQ = 'amqp://guest:guest@localhost:5672';
const open = require('amqplib').connect(RABBITMQ);
const q = 'example';
// Consumer
open
.then(function(conn) {
console.log(`[ ${new Date()} ] Server started`);
return conn.createChannel();
})
.then(function(ch) {
return ch.assertQueue(q).then(function(ok) {
return ch.consume(q, function(msg) {
console.log(
`[ ${new Date()} ] Message received: ${JSON.stringify(
JSON.parse(msg.content.toString('utf8')),
)}`,
);
if (msg !== null) {
const response = {
uuid: uuidv4(),
};
console.log(
`[ ${new Date()} ] Message sent: ${JSON.stringify(response)}`,
);
ch.sendToQueue(
msg.properties.replyTo,
Buffer.from(JSON.stringify(response)),
{
correlationId: msg.properties.correlationId,
},
);
ch.ack(msg);
}
});
});
})
.catch(console.warn);
client.js
const amqp = require('amqplib');
const EventEmitter = require('events');
const uuid = require('uuid');
const RABBITMQ = 'amqp://guest:guest@localhost:5672';
// pseudo-queue for direct reply-to
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
const q = 'example';
// Credits for Event Emitter goes to https://github.com/squaremo/amqp.node/issues/259
const createClient = rabbitmqconn =>
amqp
.connect(rabbitmqconn)
.then(conn => conn.createChannel())
.then(channel => {
channel.responseEmitter = new EventEmitter();
channel.responseEmitter.setMaxListeners(0);
channel.consume(
REPLY_QUEUE,
msg => {
channel.responseEmitter.emit(
msg.properties.correlationId,
msg.content.toString('utf8'),
);
},
{ noAck: true },
);
return channel;
});
const sendRPCMessage = (channel, message, rpcQueue) =>
new Promise(resolve => {
const correlationId = uuid.v4();
channel.responseEmitter.once(correlationId, resolve);
channel.sendToQueue(rpcQueue, Buffer.from(message), {
correlationId,
replyTo: REPLY_QUEUE,
});
});
const init = async () => {
const channel = await createClient(RABBITMQ);
const message = { uuid: uuid.v4() };
console.log(`[ ${new Date()} ] Message sent: ${JSON.stringify(message)}`);
const respone = await sendRPCMessage(channel, JSON.stringify(message), q);
console.log(`[ ${new Date()} ] Message received: ${respone}`);
process.exit();
};
try {
init();
} catch (e) {
console.log(e);
}
你会得到一个结果,比如: