如何使用 zmq (zeromq) 在两个 NodeJS 服务器之间发送多个请求和路由异步响应?

How can zmq (zeromq) be used to send multiple requests and route async responses between two NodeJS servers?

我有一个 NodeJS API 网络服务器(我们称它为 WS1),它接收来自客户端的 RESTful HTTP 请求,并响应需要首先查询另一个本地服务器(我们称它为 WS2) .

流程大致是这样的:

  1. WS1 从客户端接收 HTTP 请求并对其进行解析。
  2. WS1 向 WS2 发送请求以获取一些信息。
  3. 当 WS1 收到来自 WS2 的响应时,它完成对原始请求的处理并将响应发送回客户端。

到目前为止,WS1 和 WS2 之间的所有通信都是通过 HTTP 请求完成的,因为两台机器位于同一本地网络上。

虽然我正在考虑开始使用 zmq 来加快速度。我查看了他们在文档中显示的模式,但仍未找出并发问题。

WS1 每秒可以向 WS2 发送许多请求,并且不能保证 WS2 以与接收请求相同的顺序回复,因为一些异步操作在内部可能比其他操作花费更长的时间。

那么,在 NodeJS 中使用 zmq,我如何确保当 WS1 从 WS2 收到消息时它知道它属于哪个原始客户端请求?是否有内置机制来处理它?

谢谢!

0MQ 是一个有趣的工具集,有助于抽象套接字通信。有一些机制(你应该选择正确的套接字类型)允许服务器响应正确的客户端,并且它在 0mq 的范围内处理。

基本的 API 类型是:

  1. 推拉
  2. PUB-SUB
  3. 请求-回复

如果您希望能够让一台机器响应发起者,那么我相信您需要 REQ-REP api 类型。

然后您需要考虑每一侧的多路复用以使连接器正确。但为了简单起见,首先保持一对一:

示例客户端(来自 http://zguide.zeromq.org/js:rrclient

// Hello World client in Node.js
// Connects REQ socket to tcp://localhost:5559
// Sends "Hello" to server, expects "World" back

var zmq       = require('zmq')
  , requester = zmq.socket('req');

requester.connect('tcp://localhost:5559');
var replyNbr = 0;
requester.on('message', function(msg) {
  console.log('got reply', replyNbr, msg.toString());
  replyNbr += 1;
});

for (var i = 0; i < 10; ++i) {
  requester.send("Hello");
}

示例服务器(来自 http://zguide.zeromq.org/js:rrserver

// Hello World server in Node.js
// Connects REP socket to tcp://*:5560
// Expects "Hello" from client, replies with "World"

var zmq = require('zmq')
  , responder = zmq.socket('rep');

responder.connect('tcp://localhost:5560');
responder.on('message', function(msg) {

  console.log('received request:', msg.toString());
  setTimeout(function() {
    responder.send("World");
  }, 1000);
});

返回客户端的回复路由由 0MQ 自动处理。它是消息的一部分(虽然我不记得你是否在这些示例中看到了地址缓冲区——它可能被抽象掉了)。请求信封如下所示:

它是第一个框架,它允许 0MQ 能够回复正确的客户端。

一旦达到 运行,您就可以考虑 1..* *..1 和 ..。它真正做的只是要求您在适当的地方将套接字类型更改为 DEALER 和 ROUTER。

我最终实施了某种 "middleware" 来支持 zmq 的此功能。

在下面的示例中,为简单起见,我使用了 Express with Node >= v4.0.0 (supporting native JS promises), but you can obviously substitute it with any HTTP server you like (these days I prefer Koa) 和您喜欢的 promises 库。这是两个服务器的代码。

WS1(请求者)

var zmq = require('zmq');
var mem = {};
var requester = zmq.socket('req');
requester.on("message", function(reply) {
  reply = reply.toString().split('*');
  mem[reply.pop()](reply);
});
requester.connect("tcp://localhost:5555");


var app = require('express')();

app.get('/', function (req, res) {
  var id = Date.now() + Math.random();
  new Promise(function (resolve, reject) {
    mem[id] = function (reply) {
      reply[0] === 'success' ? resolve(reply[1]) : reject(reply[1]); 
    }
  })
  .then(function (data) {
    res.send(data);
  })
  .catch(function (err) {
    console.log(err);
    res.send(500);
  })
  requester.send(id + '*' + message);
});

var server = app.listen(3000);

WS2(响应者)

var zmq = require('zmq');
var responder = zmq.socket('rep');
responder.on('message', function(message) {
  message = message.split('*');
  var reqId = message[0];
  // Do whatever async stuff you need with message[1]
  // Then at the end of your callbacks you'll have something like this
  if (err) {
    responder.send('err' + '*' + JSON.stringify(err) + '*' + reqId);
  } else {
    responder.send('success' + '*' + JSON.stringify(yourData) + '*' + reqId);
  }
});
responder.bind('tcp://*:5555');