使用 RabbitMQ 时为每个远程方法创建一个队列?
Creating a queue per remote method when using RabbitMQ?
让我们暂时接受通过消息队列(如 RabbitMQ)实现 RPC 并不是一个可怕的想法 -- 有时在与遗留系统交互时可能是必要的。
在 RPC over RabbitMQ 的情况下,客户端将消息发送到代理,代理将消息路由到工作人员,工作人员 returns 将结果通过代理发送到客户端。但是,如果一个 worker 实现了多个远程方法,那么不同的调用需要以某种方式路由到不同的侦听器。
这种情况下的一般做法是什么?所有 RPC over MQ 示例仅显示一种远程方法。将方法名称设置为路由名称会很好也很容易rule/queue,但我不知道这是否是正确的方法。
Let's just accept for a moment that it is not a horrible idea to implement RPC over message queues (like RabbitMQ)
一点都不可怕!它很常见,并且在许多情况下都被推荐 - 不仅仅是遗留集成。
...好的,现在开始回答你的实际问题:)
从非常高的角度来看,这是您需要做的。
您的请求和响应需要包含两个关键信息:
- 一个
correlation-id
- 一个
reply-to
队列
这些信息将使您能够关联原始请求和响应。
发送请求之前
让您的请求代码为自己创建一个独占队列。该队列将用于接收回复。
创建一个新的相关 ID - 通常是 GUID 或 UUID 以保证唯一性。
发送请求时
将您生成的关联 ID 附加到消息属性。为此,您应该使用 correlationId
属性。
将关联 ID 与请求的关联回调函数(回复处理程序)存储在发出请求的代码内部的某处。当收到回复时,您将需要这样做。
将您创建的独占队列的名称也附加到消息的 replyTo
属性 中。
完成所有这些后,您可以通过 rabbitmq 发送消息
回复时
回复代码需要使用原始消息中的 correlationId
和 replyTo
字段。所以一定要抓住那些
回复应直接发送到 replyTo
队列。不要通过交换使用标准发布。相反,使用您正在使用的任何库的 "send to queue" 功能将回复消息直接发送到队列,并将响应直接发送到 replyTo
队列。
一定要在回复中包含 correlationId
。这是回答您问题的关键部分
处理回复时
发出原始请求的代码将从 replyTo
队列接收消息。然后它将 correlationId
从消息属性中拉出。
使用关联 ID 查找请求的回调方法...处理响应的代码。将消息传递给此回调方法,您就大功告成了。
实施细节
从高层次的角度来看,这是有效的。当您深入研究代码时,实现细节将根据您使用的语言和驱动程序/库而有所不同。
大多数适用于任何给定语言的优秀 RabbitMQ 库都将内置 Request/Response。如果你的没有,你可能想找一个不同的图书馆。除非您在 AMQP 协议之上编写基于模式的库,否则您应该寻找一个为您实现了通用模式的库。
如果您需要有关 Request/Reply 模式的更多信息,包括我在此处提供的所有详细信息(以及更多),请查看这些资源:
- 我自己的 RabbitMQ Patterns 电子邮件课程/电子书
- RabbitMQ Tutorials
- Enterprise Integration Patterns - 一定要购买完整描述/实现模式的书。值得拥有这本书
如果您在 Node.js 工作,我建议您使用 wascally library, which includes the Request/Reply feature you need. For Ruby, check out bunny。对于 Java 或 .NET,请查看周围众多服务总线实现中的一些。在 .NET 中,我推荐 NServiceBus 或 MassTransit。
我发现每个请求使用一个新的回复队列会变得非常低效,特别是当 运行 集群上的 RabbitMQ 时。
正如评论中所建议的那样 direct reply-to seems to be the way to go. I've documented here 我在选择那个之前尝试过的所有选项。
我写了一个 npm 包 amq.rabbitmq.reply-to.js 那:
使用直接回复 - 这一功能允许 RPC (request/reply) 客户端使用类似于教程 6 (https://www.rabbitmq.com/direct-reply-to.html) 中演示的设计来避免声明一个每个请求的响应队列。
创建一个事件发射器,其中 rpc 响应将由 correlationId 发布
根据 https://github.com/squaremo/amqp.node/issues/259#issuecomment-230165144
的建议
用法:
const rabbitmqreplyto = require('amq.rabbitmq.reply-to.js');
const serverCallbackTimesTen = (message, rpcServer) => {
const n = parseInt(message);
return Promise.resolve(`${n * 10}`);
};
let rpcServer;
let rpcClient;
Promise.resolve().then(() => {
const serverOptions = new rabbitmqreplyto.RpcServerOptions(
/* url */ undefined,
/* serverId */ undefined,
/* callback */ serverCallbackTimesTen);
return rabbitmqreplyto.RpcServer.Create(serverOptions);
}).then((rpcServerP) => {
rpcServer = rpcServerP;
return rabbitmqreplyto.RpcClient.Create();
}).then((rpcClientP) => {
rpcClient = rpcClientP;
const promises = [];
for (let i = 1; i <= 20; i++) {
promises.push(rpcClient.sendRPCMessage(`${i}`));
}
return Promise.all(promises);
}).then((replies) => {
console.log(replies);
return Promise.all([rpcServer.Close(), rpcClient.Close()]);
});
//['10',
// '20',
// '30',
// '40',
// '50',
// '60',
// '70',
// '80',
// '90',
// '100',
// '110',
// '120',
// '130',
// '140',
// '150',
// '160',
// '170',
// '180',
// '190',
// '200']
让我们暂时接受通过消息队列(如 RabbitMQ)实现 RPC 并不是一个可怕的想法 -- 有时在与遗留系统交互时可能是必要的。
在 RPC over RabbitMQ 的情况下,客户端将消息发送到代理,代理将消息路由到工作人员,工作人员 returns 将结果通过代理发送到客户端。但是,如果一个 worker 实现了多个远程方法,那么不同的调用需要以某种方式路由到不同的侦听器。
这种情况下的一般做法是什么?所有 RPC over MQ 示例仅显示一种远程方法。将方法名称设置为路由名称会很好也很容易rule/queue,但我不知道这是否是正确的方法。
Let's just accept for a moment that it is not a horrible idea to implement RPC over message queues (like RabbitMQ)
一点都不可怕!它很常见,并且在许多情况下都被推荐 - 不仅仅是遗留集成。
...好的,现在开始回答你的实际问题:)
从非常高的角度来看,这是您需要做的。
您的请求和响应需要包含两个关键信息:
- 一个
correlation-id
- 一个
reply-to
队列
这些信息将使您能够关联原始请求和响应。
发送请求之前
让您的请求代码为自己创建一个独占队列。该队列将用于接收回复。
创建一个新的相关 ID - 通常是 GUID 或 UUID 以保证唯一性。
发送请求时
将您生成的关联 ID 附加到消息属性。为此,您应该使用 correlationId
属性。
将关联 ID 与请求的关联回调函数(回复处理程序)存储在发出请求的代码内部的某处。当收到回复时,您将需要这样做。
将您创建的独占队列的名称也附加到消息的 replyTo
属性 中。
完成所有这些后,您可以通过 rabbitmq 发送消息
回复时
回复代码需要使用原始消息中的 correlationId
和 replyTo
字段。所以一定要抓住那些
回复应直接发送到 replyTo
队列。不要通过交换使用标准发布。相反,使用您正在使用的任何库的 "send to queue" 功能将回复消息直接发送到队列,并将响应直接发送到 replyTo
队列。
一定要在回复中包含 correlationId
。这是回答您问题的关键部分
处理回复时
发出原始请求的代码将从 replyTo
队列接收消息。然后它将 correlationId
从消息属性中拉出。
使用关联 ID 查找请求的回调方法...处理响应的代码。将消息传递给此回调方法,您就大功告成了。
实施细节
从高层次的角度来看,这是有效的。当您深入研究代码时,实现细节将根据您使用的语言和驱动程序/库而有所不同。
大多数适用于任何给定语言的优秀 RabbitMQ 库都将内置 Request/Response。如果你的没有,你可能想找一个不同的图书馆。除非您在 AMQP 协议之上编写基于模式的库,否则您应该寻找一个为您实现了通用模式的库。
如果您需要有关 Request/Reply 模式的更多信息,包括我在此处提供的所有详细信息(以及更多),请查看这些资源:
- 我自己的 RabbitMQ Patterns 电子邮件课程/电子书
- RabbitMQ Tutorials
- Enterprise Integration Patterns - 一定要购买完整描述/实现模式的书。值得拥有这本书
如果您在 Node.js 工作,我建议您使用 wascally library, which includes the Request/Reply feature you need. For Ruby, check out bunny。对于 Java 或 .NET,请查看周围众多服务总线实现中的一些。在 .NET 中,我推荐 NServiceBus 或 MassTransit。
我发现每个请求使用一个新的回复队列会变得非常低效,特别是当 运行 集群上的 RabbitMQ 时。
正如评论中所建议的那样 direct reply-to seems to be the way to go. I've documented here 我在选择那个之前尝试过的所有选项。
我写了一个 npm 包 amq.rabbitmq.reply-to.js 那:
使用直接回复 - 这一功能允许 RPC (request/reply) 客户端使用类似于教程 6 (https://www.rabbitmq.com/direct-reply-to.html) 中演示的设计来避免声明一个每个请求的响应队列。
创建一个事件发射器,其中 rpc 响应将由 correlationId 发布 根据 https://github.com/squaremo/amqp.node/issues/259#issuecomment-230165144
的建议
用法:
const rabbitmqreplyto = require('amq.rabbitmq.reply-to.js');
const serverCallbackTimesTen = (message, rpcServer) => {
const n = parseInt(message);
return Promise.resolve(`${n * 10}`);
};
let rpcServer;
let rpcClient;
Promise.resolve().then(() => {
const serverOptions = new rabbitmqreplyto.RpcServerOptions(
/* url */ undefined,
/* serverId */ undefined,
/* callback */ serverCallbackTimesTen);
return rabbitmqreplyto.RpcServer.Create(serverOptions);
}).then((rpcServerP) => {
rpcServer = rpcServerP;
return rabbitmqreplyto.RpcClient.Create();
}).then((rpcClientP) => {
rpcClient = rpcClientP;
const promises = [];
for (let i = 1; i <= 20; i++) {
promises.push(rpcClient.sendRPCMessage(`${i}`));
}
return Promise.all(promises);
}).then((replies) => {
console.log(replies);
return Promise.all([rpcServer.Close(), rpcClient.Close()]);
});
//['10',
// '20',
// '30',
// '40',
// '50',
// '60',
// '70',
// '80',
// '90',
// '100',
// '110',
// '120',
// '130',
// '140',
// '150',
// '160',
// '170',
// '180',
// '190',
// '200']