RabbitMQ for NodeJS with Express 路由

RabbitMQ for NodeJS with Express routing

我的服务器是 运行 NodeJS 并使用 amqplib api 从另一个应用程序请求数据。 NodeJS 服务器正在成功接收信息,但存在明显的延迟,我正在尝试确定我是否以最有效的方式执行此操作。具体来说,我关心的是打开和关闭连接的方式。
项目布局
我有两个控制器文件来处理接收和请求数据,request.img.server.controller.js 和 receive.img.server.controller.js。最后,当按下前端按钮时,路由会处理控制器方法,oct.server.routes.js.

request.img.server.controller.js

'use strict';

var amqp = require('amqplib/callback_api');
var connReady = false;
var conn, ch;
amqp.connect('amqp://localhost:5672', function(err, connection) {
    conn = connection;
    connReady = true;
    conn.createChannel(function(err, channel) {
        ch = channel;
    });
});


exports.sendRequest = function(message) {
    console.log('sending request');

    if(connReady) {
        var ex = '';
        var key = 'utils';

        ch.publish(ex, key, new Buffer(message));
        console.log(" [x] Sent %s: '%s'", key, message);
    }
};

receive.img.server.controller.js

var amqp = require('amqplib/callback_api');
var fs = require('fs');
var wstream = fs.createWriteStream('C:\Users\yako\desktop\binarytest.txt');

var image, rows, cols;
exports.getResponse = function(resCallback) {
    amqp.connect('amqp://localhost:5672', function(err, conn) {
        conn.createChannel(function(err, ch) {
            var ex = '';

            ch.assertQueue('server', {}, function(err, q) {
                console.log('waiting for images');
                var d = new Date();
                var n = d.getTime();
                ch.consume(q.queue, function(msg) {
                    console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toJSON());
                    rows = msg.content.readInt16LE(0);
                    cols = msg.content.readInt16LE(2);
                    console.log("rows = %s", msg.content.readInt16LE(0));
                    console.log("cols = %s", msg.content.readInt16LE(2));
                    image = msg.content;
                    var currMax = 0;
                    for (var i = 4; i < image.length; i+=2) {
                        if (image.readInt16LE(i) > currMax) {
                            currMax = image.readInt16LE(i);
                        }
                        wstream.write(image.readInt16LE(i) + ',');
                    }
                    console.log('done writing max is', currMax);
                    //console.log(image);
                    resCallback(rows, cols, image);
                }, {
                    noAck: true
                });
            });
        });
    });
};

oct.server.routes.js

'use strict';

module.exports = function(app) {
    var request_img = require('../../app/controllers/image-tools/request.img.server.controller.js');
    var receive_img = require('../../app/controllers/image-tools/receive.img.server.controller.js');

    // oct routes
    app.get('/load_slice', function(req, res) {
        console.log('load slice hit');
        receive_img.getResponse(function (rows, cols, image) {
            res.end(image);
        });
        request_img.sendRequest('123:C:\Users\yako\Documents\Developer\medicaldiag\test_files\RUS-01-035-09M-21.oct');
    });
};

您打开连接的方式很糟糕,这至少是性能问题的一部分。

打开连接的成本很高。他们在客户端和 rabbitmq 服务器之间的 TCP/IP 端口上打开一个新的 TCP/IP 连接。这需要时间,并且会占用客户端和服务器上的有限资源。

因此,应该在每个 node.js 进程中创建和使用到 RabbitMQ 的单个连接。该连接应由该进程中的所有代码共享。

每当您需要使用 RabbitMQ 执行某些操作时,请在共享连接上打开一个新通道并执行您的工作。通道价格低廉,旨在根据需要在连接内打开和关闭。

更具体地说,在您的代码中,receive.img.server.controller.js 文件是主要问题。这会在您每次调用 getResponse 方法时打开一个到 RabbitMQ 的新连接。

如果您有 10 个用户访问该站点,您将有 10 个打开的 RabbitMQ 连接,而 1 个连接就足够了。如果您有成千上万的用户访问该站点,那么当 1 个就足够时,您将有数千个打开的 RabbitMQ 连接。您还 运行 耗尽 RabbitMQ 服务器或客户端上的可用 TCP/IP 连接的风险。

您的 receive.img.server.controller.js 应该看起来更像您的 request.img.server.controller.js - 打开一个连接,并一直重复使用。


此外,FWIW - 我建议使用 the wascally library 用于带有 node.js 的 RabbitMQ。这个库位于 amqplib 之上,但使事情变得容易得多。它将为您管理您的一个连接,并使您更轻松地发送和接收消息。

我也有一些培训 material 可用于 RabbitMQ and node.js,涵盖 amqplib 的基础知识,然后转向使用 wascally 进行实际应用程序开发。