NodeJS 事件发射器阻塞问题

NodeJS Event Emitter Blocking Issue

我有一个节点应用程序使用此处找到的 Node-ZMQ 绑定处理来自另一个应用程序的一些 ZeroMQ 事件:https://github.com/JustinTulloss/zeromq.node

我 运行 遇到的问题是一个事件的操作之一需要很长时间才能处理,这似乎会阻止在此期间处理任何其他事件。虽然该应用程序目前没有集群,但这样做只会增加几个线程,并不能真正解决问题。我想知道是否有一种方法允许这些异步调用在它们处理时不阻止其他传入请求,以及我将如何实现它们。

这是我目前正在做的高度 condensed/contrived 代码示例:

var zmq = require('zmq');
var zmqResponder = zmq.socket('rep');
var Client = require('node-rest-client').Client;
var client = new Client();

zmqResponder.on('message', function (msg, data) {
  var parsed = JSON.parse(msg);
  logging.info('ZMQ Request received: ' + parsed.event);
  switch (parsed.event) {
    case 'create':
        //Typically short running process, not an issue
    case 'update':
        //Long running process this is the issue
        serverRequest().then(function(response){
            zmqResponder.send(JSON.stringify(response));
        });
     }

});

function serverRequest(){
    var deferred = Q.defer();
      client.get(function (data, response) {
      if (response.statusCode !== 200) {
        deferred.reject(data.data);
      } else {
        deferred.resolve(data.data);
      }
  });
    return deferred.promise;
}

编辑** 这是代码的要点:https://gist.github.com/battlecow/cd0c2233e9f197ec0049

我想,通过评论线程,我已经确定了您的问题。 REQ/REP 有严格的同步消息顺序保证...你必须 receive-send-receive-send-etc。 REQ 必须以发送开头,REP 必须以接收开头。因此,您一次只能处理一条消息,因为您选择的套接字类型强制执行了这一点。

如果您使用的是另一种 non-event-driven 语言,当您尝试连续发送或接收两次时,您可能会收到一条错误消息,告诉您做错了什么,但节点允许您这样做,然后将后续消息排队,直到按照消息顺序轮到它们为止。

您想将 REQ/REP 更改为 DEALER/ROUTER,它将按您预期的方式工作。您必须稍微更改 ROUTER 套接字的逻辑才能使其正确发送,但其他一切都应该相同。


粗略的示例代码,使用已发布要点的相关部分:

var zmqResponder = zmq.socket('router');

zmqResponder.on('message', function (msg, data) {
    var peer_id = msg[0];
    var parsed = JSON.parse(msg[1]);
    switch (parsed.event) {
        case 'create':
            // build parsedResponse, then...
            zmqResponder.send([peer_id, JSON.stringify(parsedResponse)]);
            break;
    }
});

zmqResponder.bind('tcp://*:5668', function (err) {
    if (err) {
        logging.error(err);
    } else {
        logging.info("ZMQ awaiting orders on port 5668");
    }
});

...您需要获取 peer_id(或任何您想调用的名称,在 ZMQ 命名法中,它是您发送的套接字的套接字 ID,将其视为 "address" of sorts) 从你收到的消息的第一帧开始,然后使用发送它作为你发回的消息的第一帧。

顺便说一句,我刚刚注意到在你的要点中你在同一个套接字上同时 connect()-ing 和 bind()-ing(分别为 zmq.js 第 52 和 143 行) .不要那样做。从其他线索推断,你只是想bind()这边的流程。