使用 NodeJS 发送到 ZeroMQ 云后端

Send to ZeroMQ cloud backend with NodeJS

我正在学习 ZeroMQ 并从头开始制作指南中的所有示例(在 NodeJS 中)。但我与一个创建本地集群来处理作业的堆栈并存,如果它不能再处理,则将作业发送到通过其他套接字连接的“云对等点”(“Putting it All Together” example)。

我请求你帮助调试代码以查看 为什么客户端的消息不被云对等点处理(和 returned):

  1. workers连接时,发送READY消息让localBE有多少。此消息发布给所有云对等点(附有对等点名称)。
  2. 当客户端发送 REQuest 时,它会被 localFE 接收。
  3. LocalFE ROUTE 到 localBE 如果有可用的本地工作人员。否则 ROUTE 到随机 cloudBE 对等点。
  4. 据推测,cloudFE 会收到该消息并向其本地工作人员发送 ROUTE(如果可用)。然后它应该return到原始对等方的客户端(¡!)

如果你clone and execute my repocd第3章然后两个终端node peering3.js me younode peering3.js you me为例),您可以关注谁发送和接收 (Get) 数据。

你可以玩一下 NBR_CLIENTSNBR_WORKERS(第 12 和 13 行),发现作业不正确 sent/returned…

如果你能看一下我的代码,我将不胜感激!

提前致谢...

您在云代理之间进行寻址时遇到问题。您正在使用本地 workers ID 在云代理之间进行寻址。代码注释以突出问题。

//  - Route any request locally if we can, else to cloud.
localfe.on('message', function() {
    var args = Array.apply(null, arguments);
    console.log('LocalFE: Get  ' + args.toString());

    if (localCapacity > 0) {
        console.log('LocalFE: Send '+ workers.shift() + ',\'\',' + args[0]+ ',\'\','+ args[2] + ' to LocalBE');
        localCapacity--;
        statebe.send(localCapacity);
        localbe.send([workers.shift(), '', args[0], '', args[2]]);
    } else {
        //  Route to random broker peer automatically
        var randomPeer = randomBetween(3, argc);
        var peer = process.argv[randomPeer];

        /////////////////
        // why are you referencing `workers` here, that is only for local workers
        // You correctly route to `peer` here, though, so that should be fine
        // however, you've removed a local worker from the queue erroneously
        /////////////////
        console.log('LocalFE: Send '+ workers.shift() + ',\'\',' + args[0]+ ',\'\','+ args[2] + ' to CloudBE at peer ' + peer);
        cloudbe.send([peer, '', args[0], '', args[2]]);
    }
});

... cloudfe 然后正确地将消息路由到本地工作人员,但在这样做之前它不会检查 workers 队列中的可用工作人员,因此如果所有本地工作人员工作人员被带走,然后你就会感到无聊,本地 workers 队列将是空的,消息也不会去任何地方。您还丢失了对云对等点的引用,因此当消息返回时,无法知道它需要返回到云对等点:

cloudfe.on('message', function() {
    var args = Array.apply(null, arguments);
    console.log('CloudFE: Get  ' + args.toString());

    /////////////////
    // if `workers` is already empty, `shift()`ing it will get you `undefined`
    // also, you're removing the ID from the queue, which causes problems below
    /////////////////
    console.log('CloudFE: Send '+ workers.shift() + ',\'\',' + args[2]+ ',\'\','+ args[4] + ' to LocalBE');
    localCapacity--;
    statebe.send(localCapacity);

    /////////////////
    // you're now sending it to a *different* worker than you logged above
    // and you've removed *two* workers from the queue instead of one
    // as said above, if `workers` is already empty, you'll route it nowhere
    // and lose the message
    // further, nowhere in here are you logging the identity of the cloud peer, so you've 
    // lost the ability to route it back to the cloud peer that has the client
    /////////////////
    localbe.send([workers.shift(), '', args[2], '', args[4]]);
});

... worker 应该处理消息并将其成功发送回其本地代理,至少对于前两条消息(不是 5,因为我们只将它发送给 worker 2 和 4)。但是我们不仅失去了对之前向我们发送消息的云代理的引用,而且当我们从工作人员那里收到消息时,我们甚至没有尝试将其发回:

//  Reply from local worker.
localbe.on('message', function() {
    var args = Array.apply(null, arguments);
    //console.log('LocalBE: Get  ' + args.toString());
    workers.push(args[0]); // Add its identity to the array.

    //  We broadcast new capacity messages to other peers.
    localCapacity++;
    statebe.send(localCapacity);

    //  If it's not READY message, route the reply to client.
    if (args[2].toString() != WORKER_READY) {
        console.log('LocalBE: Send ' + args[2].toString() + ',\'\', ' + args[4].toString());

        /////////////////
        // you're attempting to send it directly back to the client, but the client
        // you're addressing is not `connect()`ed to this broker, it's connected to
        // the cloud broker, so it goes nowhere
        /////////////////
        localfe.send([args[2], '', args[4]]);
    }
});

所以:

  1. localfe 发送到云代理时,不要 shift() 您的工作人员排队 - 您可能甚至在有机会向他们发送工作之前就失去了所有本地工作人员
  2. cloudfe 上开始收到消息时,shift() 将工人 ID 关闭 一次 到局部变量中,并在任何地方使用该局部变量需要它
  3. 捕获您的云对等 ID 并将其添加到消息中,以便您知道哪个对等发起了云请求。
  4. 如果队列中没有可用的工作人员,请保留它并执行 setTimeout() 再试一次,或者将其发送到新的云对等点。为了简单起见,我建议使用前者,否则您可能必须在消息中跟踪一整列云对等 ID。
  5. 当收到来自工作人员的消息时,检查云对等 ID,如果找到,则将其适当路由回,而不是盲目地将其路由回可能通过不同云代理连接的客户端。