使用 RabbitMQ 时为每个远程方法创建一个队列?

Creating a queue per remote method when using RabbitMQ?

让我们暂时接受通过消息队列(如 RabbitMQ)实现 RPC 并不是一个可怕的想法 -- 有时在与遗留系统交互时可能是必要的。

在 RPC over RabbitMQ 的情况下,客户端将消息发送到代理,代理将消息路由到工作人员,工作人员 returns 将结果通过代理发送到客户端。但是,如果一个 worker 实现了多个远程方法,那么不同的调用需要以某种方式路由到不同的侦听器。

这种情况下的一般做法是什么?所有 RPC over MQ 示例仅显示一种远程方法。将方法名称设置为路由名称会很好也很容易rule/queue,但我不知道这是否是正确的方法。

Let's just accept for a moment that it is not a horrible idea to implement RPC over message queues (like RabbitMQ)

一点都不可怕!它很常见,并且在许多情况下都被推荐 - 不仅仅是遗留集成。

...好的,现在开始回答你的实际问题:)


从非常高的角度来看,这是您需要做的。

您的请求和响应需要包含两个关键信息:

  • 一个correlation-id
  • 一个reply-to队列

这些信息将使您能够关联原始请求和响应。

发送请求之前

让您的请求代码为自己创建一个独占队列。该队列将用于接收回复。

创建一个新的相关 ID - 通常是 GUID 或 UUID 以保证唯一性。

发送请求时

将您生成的关联 ID 附加到消息属性。为此,您应该使用 correlationId 属性。

将关联 ID 与请求的关联回调函数(回复处理程序)存储在发出请求的代码内部的某处。当收到回复时,您将需要这样做。

将您创建的独占队列的名称也附加到消息的 replyTo 属性 中。

完成所有这些后,您可以通过 rabbitmq 发送消息

回复时

回复代码需要使用原始消息中的 correlationIdreplyTo 字段。所以一定要抓住那些

回复应直接发送到 replyTo 队列。不要通过交换使用标准发布。相反,使用您正在使用的任何库的 "send to queue" 功能将回复消息直接发送到队列,并将响应直接发送到 replyTo 队列。

一定要在回复中包含 correlationId。这是回答您问题的关键部分

处理回复时

发出原始请求的代码将从 replyTo 队列接收消息。然后它将 correlationId 从消息属性中拉出。

使用关联 ID 查找请求的回调方法...处理响应的代码。将消息传递给此回调方法,您就大功告成了。

实施细节

从高层次的角度来看,这是有效的。当您深入研究代码时,实现细节将根据您使用的语言和驱动程序/库而有所不同。

大多数适用于任何给定语言的优秀 RabbitMQ 库都将内置 Request/Response。如果你的没有,你可能想找一个不同的图书馆。除非您在 AMQP 协议之上编写基于模式的库,否则您应该寻找一个为您实现了通用模式的库。

如果您需要有关 Request/Reply 模式的更多信息,包括我在此处提供的所有详细信息(以及更多),请查看这些资源:

如果您在 Node.js 工作,我建议您使用 wascally library, which includes the Request/Reply feature you need. For Ruby, check out bunny。对于 Java 或 .NET,请查看周围众多服务总线实现中的一些。在 .NET 中,我推荐 NServiceBus 或 MassTransit。

我发现每个请求使用一个新的回复队列会变得非常低效,特别是当 运行 集群上的 RabbitMQ 时。

正如评论中所建议的那样 direct reply-to seems to be the way to go. I've documented here 我在选择那个之前尝试过的所有选项。

我写了一个 npm 包 amq.rabbitmq.reply-to.js 那:

用法:

const rabbitmqreplyto = require('amq.rabbitmq.reply-to.js');

const serverCallbackTimesTen = (message, rpcServer) => {
    const n = parseInt(message);
    return Promise.resolve(`${n * 10}`);
};

let rpcServer;
let rpcClient;
Promise.resolve().then(() => {
    const serverOptions = new rabbitmqreplyto.RpcServerOptions(
    /* url */ undefined, 
    /* serverId */ undefined, 
    /* callback */ serverCallbackTimesTen);

    return rabbitmqreplyto.RpcServer.Create(serverOptions);
}).then((rpcServerP) => {
    rpcServer = rpcServerP;
    return rabbitmqreplyto.RpcClient.Create();
}).then((rpcClientP) => {
    rpcClient = rpcClientP;
    const promises = [];
    for (let i = 1; i <= 20; i++) {
        promises.push(rpcClient.sendRPCMessage(`${i}`));
    }
    return Promise.all(promises);
}).then((replies) => {
    console.log(replies);
    return Promise.all([rpcServer.Close(), rpcClient.Close()]);
});

//['10',
//  '20',
//  '30',
//  '40',
//  '50',
//  '60',
//  '70',
//  '80',
//  '90',
//  '100',
//  '110',
//  '120',
//  '130',
//  '140',
//  '150',
//  '160',
//  '170',
//  '180',
//  '190',
//  '200']